1from __future__ import absolute_import, division 2 3import collections 4import copy 5import errno 6import io 7import logging 8from random import shuffle, uniform 9 10# selectors in stdlib as of py3.4 11try: 12 import selectors # pylint: disable=import-error 13except ImportError: 14 # vendored backport module 15 from kafka.vendor import selectors34 as selectors 16 17import socket 18import struct 19import sys 20import threading 21import time 22 23from kafka.vendor import six 24 25import kafka.errors as Errors 26from kafka.future import Future 27from kafka.metrics.stats import Avg, Count, Max, Rate 28from kafka.protocol.admin import SaslHandShakeRequest 29from kafka.protocol.commit import OffsetFetchRequest 30from kafka.protocol.metadata import MetadataRequest 31from kafka.protocol.parser import KafkaProtocol 32from kafka.protocol.types import Int32, Int8 33from kafka.version import __version__ 34 35 36if six.PY2: 37 ConnectionError = socket.error 38 BlockingIOError = Exception 39 40log = logging.getLogger(__name__) 41 42DEFAULT_KAFKA_PORT = 9092 43 44SASL_QOP_AUTH = 1 45SASL_QOP_AUTH_INT = 2 46SASL_QOP_AUTH_CONF = 4 47 48try: 49 import ssl 50 ssl_available = True 51 try: 52 SSLEOFError = ssl.SSLEOFError 53 SSLWantReadError = ssl.SSLWantReadError 54 SSLWantWriteError = ssl.SSLWantWriteError 55 SSLZeroReturnError = ssl.SSLZeroReturnError 56 except AttributeError: 57 # support older ssl libraries 58 log.warning('Old SSL module detected.' 59 ' SSL error handling may not operate cleanly.' 60 ' Consider upgrading to Python 3.3 or 2.7.9') 61 SSLEOFError = ssl.SSLError 62 SSLWantReadError = ssl.SSLError 63 SSLWantWriteError = ssl.SSLError 64 SSLZeroReturnError = ssl.SSLError 65except ImportError: 66 # support Python without ssl libraries 67 ssl_available = False 68 class SSLWantReadError(Exception): 69 pass 70 class SSLWantWriteError(Exception): 71 pass 72 73# needed for SASL_GSSAPI authentication: 74try: 75 import gssapi 76 from gssapi.raw.misc import GSSError 77except ImportError: 78 #no gssapi available, will disable gssapi mechanism 79 gssapi = None 80 GSSError = None 81 82 83AFI_NAMES = { 84 socket.AF_UNSPEC: "unspecified", 85 socket.AF_INET: "IPv4", 86 socket.AF_INET6: "IPv6", 87} 88 89 90class ConnectionStates(object): 91 DISCONNECTING = '<disconnecting>' 92 DISCONNECTED = '<disconnected>' 93 CONNECTING = '<connecting>' 94 HANDSHAKE = '<handshake>' 95 CONNECTED = '<connected>' 96 AUTHENTICATING = '<authenticating>' 97 98 99class BrokerConnection(object): 100 """Initialize a Kafka broker connection 101 102 Keyword Arguments: 103 client_id (str): a name for this client. This string is passed in 104 each request to servers and can be used to identify specific 105 server-side log entries that correspond to this client. Also 106 submitted to GroupCoordinator for logging with respect to 107 consumer group administration. Default: 'kafka-python-{version}' 108 reconnect_backoff_ms (int): The amount of time in milliseconds to 109 wait before attempting to reconnect to a given host. 110 Default: 50. 111 reconnect_backoff_max_ms (int): The maximum amount of time in 112 milliseconds to wait when reconnecting to a broker that has 113 repeatedly failed to connect. If provided, the backoff per host 114 will increase exponentially for each consecutive connection 115 failure, up to this maximum. To avoid connection storms, a 116 randomization factor of 0.2 will be applied to the backoff 117 resulting in a random range between 20% below and 20% above 118 the computed value. Default: 1000. 119 request_timeout_ms (int): Client request timeout in milliseconds. 120 Default: 30000. 121 max_in_flight_requests_per_connection (int): Requests are pipelined 122 to kafka brokers up to this number of maximum requests per 123 broker connection. Default: 5. 124 receive_buffer_bytes (int): The size of the TCP receive buffer 125 (SO_RCVBUF) to use when reading data. Default: None (relies on 126 system defaults). Java client defaults to 32768. 127 send_buffer_bytes (int): The size of the TCP send buffer 128 (SO_SNDBUF) to use when sending data. Default: None (relies on 129 system defaults). Java client defaults to 131072. 130 socket_options (list): List of tuple-arguments to socket.setsockopt 131 to apply to broker connection sockets. Default: 132 [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)] 133 security_protocol (str): Protocol used to communicate with brokers. 134 Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. 135 Default: PLAINTEXT. 136 ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping 137 socket connections. If provided, all other ssl_* configurations 138 will be ignored. Default: None. 139 ssl_check_hostname (bool): flag to configure whether ssl handshake 140 should verify that the certificate matches the brokers hostname. 141 default: True. 142 ssl_cafile (str): optional filename of ca file to use in certificate 143 veriication. default: None. 144 ssl_certfile (str): optional filename of file in pem format containing 145 the client certificate, as well as any ca certificates needed to 146 establish the certificate's authenticity. default: None. 147 ssl_keyfile (str): optional filename containing the client private key. 148 default: None. 149 ssl_password (callable, str, bytes, bytearray): optional password or 150 callable function that returns a password, for decrypting the 151 client private key. Default: None. 152 ssl_crlfile (str): optional filename containing the CRL to check for 153 certificate expiration. By default, no CRL check is done. When 154 providing a file, only the leaf certificate will be checked against 155 this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+. 156 default: None. 157 api_version (tuple): Specify which Kafka API version to use. 158 Accepted values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), 159 (0, 10). Default: (0, 8, 2) 160 api_version_auto_timeout_ms (int): number of milliseconds to throw a 161 timeout exception from the constructor when checking the broker 162 api version. Only applies if api_version is None 163 selector (selectors.BaseSelector): Provide a specific selector 164 implementation to use for I/O multiplexing. 165 Default: selectors.DefaultSelector 166 state_change_callback (callable): function to be called when the 167 connection state changes from CONNECTING to CONNECTED etc. 168 metrics (kafka.metrics.Metrics): Optionally provide a metrics 169 instance for capturing network IO stats. Default: None. 170 metric_group_prefix (str): Prefix for metric names. Default: '' 171 sasl_mechanism (str): Authentication mechanism when security_protocol 172 is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: 173 PLAIN, GSSAPI. Default: PLAIN 174 sasl_plain_username (str): username for sasl PLAIN authentication. 175 Default: None 176 sasl_plain_password (str): password for sasl PLAIN authentication. 177 Default: None 178 sasl_kerberos_service_name (str): Service name to include in GSSAPI 179 sasl mechanism handshake. Default: 'kafka' 180 sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI 181 sasl mechanism handshake. Default: one of bootstrap servers 182 """ 183 184 DEFAULT_CONFIG = { 185 'client_id': 'kafka-python-' + __version__, 186 'node_id': 0, 187 'request_timeout_ms': 30000, 188 'reconnect_backoff_ms': 50, 189 'reconnect_backoff_max_ms': 1000, 190 'max_in_flight_requests_per_connection': 5, 191 'receive_buffer_bytes': None, 192 'send_buffer_bytes': None, 193 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], 194 'sock_chunk_bytes': 4096, # undocumented experimental option 195 'sock_chunk_buffer_count': 1000, # undocumented experimental option 196 'security_protocol': 'PLAINTEXT', 197 'ssl_context': None, 198 'ssl_check_hostname': True, 199 'ssl_cafile': None, 200 'ssl_certfile': None, 201 'ssl_keyfile': None, 202 'ssl_crlfile': None, 203 'ssl_password': None, 204 'api_version': (0, 8, 2), # default to most restrictive 205 'selector': selectors.DefaultSelector, 206 'state_change_callback': lambda conn: True, 207 'metrics': None, 208 'metric_group_prefix': '', 209 'sasl_mechanism': 'PLAIN', 210 'sasl_plain_username': None, 211 'sasl_plain_password': None, 212 'sasl_kerberos_service_name': 'kafka', 213 'sasl_kerberos_domain_name': None 214 } 215 SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') 216 SASL_MECHANISMS = ('PLAIN', 'GSSAPI') 217 218 def __init__(self, host, port, afi, **configs): 219 self.host = host 220 self.port = port 221 self.afi = afi 222 self._sock_afi = afi 223 self._sock_addr = None 224 self._api_versions = None 225 226 self.config = copy.copy(self.DEFAULT_CONFIG) 227 for key in self.config: 228 if key in configs: 229 self.config[key] = configs[key] 230 231 self.node_id = self.config.pop('node_id') 232 233 if self.config['api_version'] is None: 234 self.config['api_version'] = self.DEFAULT_CONFIG['api_version'] 235 236 if self.config['receive_buffer_bytes'] is not None: 237 self.config['socket_options'].append( 238 (socket.SOL_SOCKET, socket.SO_RCVBUF, 239 self.config['receive_buffer_bytes'])) 240 if self.config['send_buffer_bytes'] is not None: 241 self.config['socket_options'].append( 242 (socket.SOL_SOCKET, socket.SO_SNDBUF, 243 self.config['send_buffer_bytes'])) 244 245 assert self.config['security_protocol'] in self.SECURITY_PROTOCOLS, ( 246 'security_protcol must be in ' + ', '.join(self.SECURITY_PROTOCOLS)) 247 248 if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): 249 assert ssl_available, "Python wasn't built with SSL support" 250 251 if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'): 252 assert self.config['sasl_mechanism'] in self.SASL_MECHANISMS, ( 253 'sasl_mechanism must be in ' + ', '.join(self.SASL_MECHANISMS)) 254 if self.config['sasl_mechanism'] == 'PLAIN': 255 assert self.config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl' 256 assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl' 257 if self.config['sasl_mechanism'] == 'GSSAPI': 258 assert gssapi is not None, 'GSSAPI lib not available' 259 assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' 260 261 # This is not a general lock / this class is not generally thread-safe yet 262 # However, to avoid pushing responsibility for maintaining 263 # per-connection locks to the upstream client, we will use this lock to 264 # make sure that access to the protocol buffer is synchronized 265 # when sends happen on multiple threads 266 self._lock = threading.Lock() 267 268 # the protocol parser instance manages actual tracking of the 269 # sequence of in-flight requests to responses, which should 270 # function like a FIFO queue. For additional request data, 271 # including tracking request futures and timestamps, we 272 # can use a simple dictionary of correlation_id => request data 273 self.in_flight_requests = dict() 274 275 self._protocol = KafkaProtocol( 276 client_id=self.config['client_id'], 277 api_version=self.config['api_version']) 278 self.state = ConnectionStates.DISCONNECTED 279 self._reset_reconnect_backoff() 280 self._sock = None 281 self._ssl_context = None 282 if self.config['ssl_context'] is not None: 283 self._ssl_context = self.config['ssl_context'] 284 self._sasl_auth_future = None 285 self.last_attempt = 0 286 self._gai = [] 287 self._sensors = None 288 if self.config['metrics']: 289 self._sensors = BrokerConnectionMetrics(self.config['metrics'], 290 self.config['metric_group_prefix'], 291 self.node_id) 292 293 def _dns_lookup(self): 294 self._gai = dns_lookup(self.host, self.port, self.afi) 295 if not self._gai: 296 log.error('DNS lookup failed for %s:%i (%s)', 297 self.host, self.port, self.afi) 298 return False 299 return True 300 301 def _next_afi_sockaddr(self): 302 if not self._gai: 303 if not self._dns_lookup(): 304 return 305 afi, _, __, ___, sockaddr = self._gai.pop(0) 306 return (afi, sockaddr) 307 308 def connect_blocking(self, timeout=float('inf')): 309 if self.connected(): 310 return True 311 timeout += time.time() 312 # First attempt to perform dns lookup 313 # note that the underlying interface, socket.getaddrinfo, 314 # has no explicit timeout so we may exceed the user-specified timeout 315 self._dns_lookup() 316 317 # Loop once over all returned dns entries 318 selector = None 319 while self._gai: 320 while time.time() < timeout: 321 self.connect() 322 if self.connected(): 323 if selector is not None: 324 selector.close() 325 return True 326 elif self.connecting(): 327 if selector is None: 328 selector = self.config['selector']() 329 selector.register(self._sock, selectors.EVENT_WRITE) 330 selector.select(1) 331 elif self.disconnected(): 332 if selector is not None: 333 selector.close() 334 selector = None 335 break 336 else: 337 break 338 return False 339 340 def connect(self): 341 """Attempt to connect and return ConnectionState""" 342 if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out(): 343 self.last_attempt = time.time() 344 next_lookup = self._next_afi_sockaddr() 345 if not next_lookup: 346 self.close(Errors.KafkaConnectionError('DNS failure')) 347 return 348 else: 349 log.debug('%s: creating new socket', self) 350 self._sock_afi, self._sock_addr = next_lookup 351 self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM) 352 353 for option in self.config['socket_options']: 354 log.debug('%s: setting socket option %s', self, option) 355 self._sock.setsockopt(*option) 356 357 self._sock.setblocking(False) 358 self.state = ConnectionStates.CONNECTING 359 if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): 360 self._wrap_ssl() 361 # _wrap_ssl can alter the connection state -- disconnects on failure 362 # so we need to double check that we are still connecting before 363 if self.connecting(): 364 self.config['state_change_callback'](self) 365 log.info('%s: connecting to %s:%d [%s %s]', self, self.host, 366 self.port, self._sock_addr, AFI_NAMES[self._sock_afi]) 367 368 if self.state is ConnectionStates.CONNECTING: 369 # in non-blocking mode, use repeated calls to socket.connect_ex 370 # to check connection status 371 ret = None 372 try: 373 ret = self._sock.connect_ex(self._sock_addr) 374 except socket.error as err: 375 ret = err.errno 376 except ValueError as err: 377 # Python 3.7 and higher raises ValueError if a socket 378 # is already connected 379 if sys.version_info >= (3, 7): 380 ret = None 381 else: 382 raise 383 384 # Connection succeeded 385 if not ret or ret == errno.EISCONN: 386 log.debug('%s: established TCP connection', self) 387 if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): 388 log.debug('%s: initiating SSL handshake', self) 389 self.state = ConnectionStates.HANDSHAKE 390 elif self.config['security_protocol'] == 'SASL_PLAINTEXT': 391 log.debug('%s: initiating SASL authentication', self) 392 self.state = ConnectionStates.AUTHENTICATING 393 else: 394 # security_protocol PLAINTEXT 395 log.info('%s: Connection complete.', self) 396 self.state = ConnectionStates.CONNECTED 397 self._reset_reconnect_backoff() 398 self.config['state_change_callback'](self) 399 400 # Connection failed 401 # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems 402 elif ret not in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022): 403 log.error('Connect attempt to %s returned error %s.' 404 ' Disconnecting.', self, ret) 405 errstr = errno.errorcode.get(ret, 'UNKNOWN') 406 self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr))) 407 408 # Needs retry 409 else: 410 pass 411 412 if self.state is ConnectionStates.HANDSHAKE: 413 if self._try_handshake(): 414 log.debug('%s: completed SSL handshake.', self) 415 if self.config['security_protocol'] == 'SASL_SSL': 416 log.debug('%s: initiating SASL authentication', self) 417 self.state = ConnectionStates.AUTHENTICATING 418 else: 419 log.info('%s: Connection complete.', self) 420 self.state = ConnectionStates.CONNECTED 421 self.config['state_change_callback'](self) 422 423 if self.state is ConnectionStates.AUTHENTICATING: 424 assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL') 425 if self._try_authenticate(): 426 # _try_authenticate has side-effects: possibly disconnected on socket errors 427 if self.state is ConnectionStates.AUTHENTICATING: 428 log.info('%s: Connection complete.', self) 429 self.state = ConnectionStates.CONNECTED 430 self._reset_reconnect_backoff() 431 self.config['state_change_callback'](self) 432 433 if self.state not in (ConnectionStates.CONNECTED, 434 ConnectionStates.DISCONNECTED): 435 # Connection timed out 436 request_timeout = self.config['request_timeout_ms'] / 1000.0 437 if time.time() > request_timeout + self.last_attempt: 438 log.error('Connection attempt to %s timed out', self) 439 self.close(Errors.KafkaConnectionError('timeout')) 440 441 return self.state 442 443 def _wrap_ssl(self): 444 assert self.config['security_protocol'] in ('SSL', 'SASL_SSL') 445 if self._ssl_context is None: 446 log.debug('%s: configuring default SSL Context', self) 447 self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) # pylint: disable=no-member 448 self._ssl_context.options |= ssl.OP_NO_SSLv2 # pylint: disable=no-member 449 self._ssl_context.options |= ssl.OP_NO_SSLv3 # pylint: disable=no-member 450 self._ssl_context.verify_mode = ssl.CERT_OPTIONAL 451 if self.config['ssl_check_hostname']: 452 self._ssl_context.check_hostname = True 453 if self.config['ssl_cafile']: 454 log.info('%s: Loading SSL CA from %s', self, self.config['ssl_cafile']) 455 self._ssl_context.load_verify_locations(self.config['ssl_cafile']) 456 self._ssl_context.verify_mode = ssl.CERT_REQUIRED 457 if self.config['ssl_certfile'] and self.config['ssl_keyfile']: 458 log.info('%s: Loading SSL Cert from %s', self, self.config['ssl_certfile']) 459 log.info('%s: Loading SSL Key from %s', self, self.config['ssl_keyfile']) 460 self._ssl_context.load_cert_chain( 461 certfile=self.config['ssl_certfile'], 462 keyfile=self.config['ssl_keyfile'], 463 password=self.config['ssl_password']) 464 if self.config['ssl_crlfile']: 465 if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'): 466 raise RuntimeError('This version of Python does not support ssl_crlfile!') 467 log.info('%s: Loading SSL CRL from %s', self, self.config['ssl_crlfile']) 468 self._ssl_context.load_verify_locations(self.config['ssl_crlfile']) 469 # pylint: disable=no-member 470 self._ssl_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF 471 log.debug('%s: wrapping socket in ssl context', self) 472 try: 473 self._sock = self._ssl_context.wrap_socket( 474 self._sock, 475 server_hostname=self.host, 476 do_handshake_on_connect=False) 477 except ssl.SSLError as e: 478 log.exception('%s: Failed to wrap socket in SSLContext!', self) 479 self.close(e) 480 481 def _try_handshake(self): 482 assert self.config['security_protocol'] in ('SSL', 'SASL_SSL') 483 try: 484 self._sock.do_handshake() 485 return True 486 # old ssl in python2.6 will swallow all SSLErrors here... 487 except (SSLWantReadError, SSLWantWriteError): 488 pass 489 # python 3.7 throws OSError 490 except OSError: 491 pass 492 except (SSLZeroReturnError, ConnectionError, SSLEOFError): 493 log.warning('SSL connection closed by server during handshake.') 494 self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake')) 495 # Other SSLErrors will be raised to user 496 497 return False 498 499 def _try_authenticate(self): 500 assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10) 501 502 if self._sasl_auth_future is None: 503 # Build a SaslHandShakeRequest message 504 request = SaslHandShakeRequest[0](self.config['sasl_mechanism']) 505 future = Future() 506 sasl_response = self._send(request) 507 sasl_response.add_callback(self._handle_sasl_handshake_response, future) 508 sasl_response.add_errback(lambda f, e: f.failure(e), future) 509 self._sasl_auth_future = future 510 511 for r, f in self.recv(): 512 f.success(r) 513 514 # A connection error could trigger close() which will reset the future 515 if self._sasl_auth_future is None: 516 return False 517 elif self._sasl_auth_future.failed(): 518 ex = self._sasl_auth_future.exception 519 if not isinstance(ex, Errors.KafkaConnectionError): 520 raise ex # pylint: disable-msg=raising-bad-type 521 return self._sasl_auth_future.succeeded() 522 523 def _handle_sasl_handshake_response(self, future, response): 524 error_type = Errors.for_code(response.error_code) 525 if error_type is not Errors.NoError: 526 error = error_type(self) 527 self.close(error=error) 528 return future.failure(error_type(self)) 529 530 if self.config['sasl_mechanism'] not in response.enabled_mechanisms: 531 return future.failure( 532 Errors.UnsupportedSaslMechanismError( 533 'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s' 534 % (self.config['sasl_mechanism'], response.enabled_mechanisms))) 535 elif self.config['sasl_mechanism'] == 'PLAIN': 536 return self._try_authenticate_plain(future) 537 elif self.config['sasl_mechanism'] == 'GSSAPI': 538 return self._try_authenticate_gssapi(future) 539 else: 540 return future.failure( 541 Errors.UnsupportedSaslMechanismError( 542 'kafka-python does not support SASL mechanism %s' % 543 self.config['sasl_mechanism'])) 544 545 def _send_bytes_blocking(self, data): 546 self._sock.settimeout(self.config['request_timeout_ms'] / 1000) 547 total_sent = 0 548 try: 549 while total_sent < len(data): 550 sent_bytes = self._sock.send(data[total_sent:]) 551 total_sent += sent_bytes 552 if total_sent != len(data): 553 raise ConnectionError('Buffer overrun during socket send') 554 return total_sent 555 finally: 556 self._sock.settimeout(0.0) 557 558 def _recv_bytes_blocking(self, n): 559 self._sock.settimeout(self.config['request_timeout_ms'] / 1000) 560 try: 561 data = b'' 562 while len(data) < n: 563 fragment = self._sock.recv(n - len(data)) 564 if not fragment: 565 raise ConnectionError('Connection reset during recv') 566 data += fragment 567 return data 568 finally: 569 self._sock.settimeout(0.0) 570 571 def _try_authenticate_plain(self, future): 572 if self.config['security_protocol'] == 'SASL_PLAINTEXT': 573 log.warning('%s: Sending username and password in the clear', self) 574 575 data = b'' 576 # Send PLAIN credentials per RFC-4616 577 msg = bytes('\0'.join([self.config['sasl_plain_username'], 578 self.config['sasl_plain_username'], 579 self.config['sasl_plain_password']]).encode('utf-8')) 580 size = Int32.encode(len(msg)) 581 try: 582 self._send_bytes_blocking(size + msg) 583 584 # The server will send a zero sized message (that is Int32(0)) on success. 585 # The connection is closed on failure 586 data = self._recv_bytes_blocking(4) 587 588 except ConnectionError as e: 589 log.exception("%s: Error receiving reply from server", self) 590 error = Errors.KafkaConnectionError("%s: %s" % (self, e)) 591 self.close(error=error) 592 return future.failure(error) 593 594 if data != b'\x00\x00\x00\x00': 595 error = Errors.AuthenticationFailedError('Unrecognized response during authentication') 596 return future.failure(error) 597 598 log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username']) 599 return future.success(True) 600 601 def _try_authenticate_gssapi(self, future): 602 kerberos_damin_name = self.config['sasl_kerberos_domain_name'] or self.host 603 auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name 604 gssapi_name = gssapi.Name( 605 auth_id, 606 name_type=gssapi.NameType.hostbased_service 607 ).canonicalize(gssapi.MechType.kerberos) 608 log.debug('%s: GSSAPI name: %s', self, gssapi_name) 609 610 # Establish security context and negotiate protection level 611 # For reference RFC 2222, section 7.2.1 612 try: 613 # Exchange tokens until authentication either succeeds or fails 614 client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate') 615 received_token = None 616 while not client_ctx.complete: 617 # calculate an output token from kafka token (or None if first iteration) 618 output_token = client_ctx.step(received_token) 619 620 # pass output token to kafka, or send empty response if the security 621 # context is complete (output token is None in that case) 622 if output_token is None: 623 self._send_bytes_blocking(Int32.encode(0)) 624 else: 625 msg = output_token 626 size = Int32.encode(len(msg)) 627 self._send_bytes_blocking(size + msg) 628 629 # The server will send a token back. Processing of this token either 630 # establishes a security context, or it needs further token exchange. 631 # The gssapi will be able to identify the needed next step. 632 # The connection is closed on failure. 633 header = self._recv_bytes_blocking(4) 634 (token_size,) = struct.unpack('>i', header) 635 received_token = self._recv_bytes_blocking(token_size) 636 637 # Process the security layer negotiation token, sent by the server 638 # once the security context is established. 639 640 # unwraps message containing supported protection levels and msg size 641 msg = client_ctx.unwrap(received_token).message 642 # Kafka currently doesn't support integrity or confidentiality security layers, so we 643 # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed 644 # by the server 645 msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))) + msg[1:] 646 # add authorization identity to the response, GSS-wrap and send it 647 msg = client_ctx.wrap(msg + auth_id.encode(), False).message 648 size = Int32.encode(len(msg)) 649 self._send_bytes_blocking(size + msg) 650 651 except ConnectionError as e: 652 log.exception("%s: Error receiving reply from server", self) 653 error = Errors.KafkaConnectionError("%s: %s" % (self, e)) 654 self.close(error=error) 655 return future.failure(error) 656 except Exception as e: 657 return future.failure(e) 658 659 log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name) 660 return future.success(True) 661 662 def blacked_out(self): 663 """ 664 Return true if we are disconnected from the given node and can't 665 re-establish a connection yet 666 """ 667 if self.state is ConnectionStates.DISCONNECTED: 668 if time.time() < self.last_attempt + self._reconnect_backoff: 669 return True 670 return False 671 672 def connection_delay(self): 673 """ 674 Return the number of milliseconds to wait, based on the connection 675 state, before attempting to send data. When disconnected, this respects 676 the reconnect backoff time. When connecting, returns 0 to allow 677 non-blocking connect to finish. When connected, returns a very large 678 number to handle slow/stalled connections. 679 """ 680 time_waited = time.time() - (self.last_attempt or 0) 681 if self.state is ConnectionStates.DISCONNECTED: 682 return max(self._reconnect_backoff - time_waited, 0) * 1000 683 elif self.connecting(): 684 return 0 685 else: 686 return float('inf') 687 688 def connected(self): 689 """Return True iff socket is connected.""" 690 return self.state is ConnectionStates.CONNECTED 691 692 def connecting(self): 693 """Returns True if still connecting (this may encompass several 694 different states, such as SSL handshake, authorization, etc).""" 695 return self.state in (ConnectionStates.CONNECTING, 696 ConnectionStates.HANDSHAKE, 697 ConnectionStates.AUTHENTICATING) 698 699 def disconnected(self): 700 """Return True iff socket is closed""" 701 return self.state is ConnectionStates.DISCONNECTED 702 703 def _reset_reconnect_backoff(self): 704 self._failures = 0 705 self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0 706 707 def _update_reconnect_backoff(self): 708 # Do not mark as failure if there are more dns entries available to try 709 if len(self._gai) > 0: 710 return 711 if self.config['reconnect_backoff_max_ms'] > self.config['reconnect_backoff_ms']: 712 self._failures += 1 713 self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1) 714 self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max_ms']) 715 self._reconnect_backoff *= uniform(0.8, 1.2) 716 self._reconnect_backoff /= 1000.0 717 log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures) 718 719 def _close_socket(self): 720 if self._sock: 721 self._sock.close() 722 self._sock = None 723 724 def __del__(self): 725 self._close_socket() 726 727 def close(self, error=None): 728 """Close socket and fail all in-flight-requests. 729 730 Arguments: 731 error (Exception, optional): pending in-flight-requests 732 will be failed with this exception. 733 Default: kafka.errors.KafkaConnectionError. 734 """ 735 if self.state is ConnectionStates.DISCONNECTED: 736 if error is not None: 737 log.warning('%s: Duplicate close() with error: %s', self, error) 738 return 739 log.info('%s: Closing connection. %s', self, error or '') 740 self.state = ConnectionStates.DISCONNECTING 741 self.config['state_change_callback'](self) 742 self._update_reconnect_backoff() 743 self._close_socket() 744 self.state = ConnectionStates.DISCONNECTED 745 self._sasl_auth_future = None 746 self._protocol = KafkaProtocol( 747 client_id=self.config['client_id'], 748 api_version=self.config['api_version']) 749 if error is None: 750 error = Errors.Cancelled(str(self)) 751 while self.in_flight_requests: 752 (_correlation_id, (future, _timestamp)) = self.in_flight_requests.popitem() 753 future.failure(error) 754 self.config['state_change_callback'](self) 755 756 def send(self, request, blocking=True): 757 """Queue request for async network send, return Future()""" 758 future = Future() 759 if self.connecting(): 760 return future.failure(Errors.NodeNotReadyError(str(self))) 761 elif not self.connected(): 762 return future.failure(Errors.KafkaConnectionError(str(self))) 763 elif not self.can_send_more(): 764 return future.failure(Errors.TooManyInFlightRequests(str(self))) 765 return self._send(request, blocking=blocking) 766 767 def _send(self, request, blocking=True): 768 assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED) 769 future = Future() 770 with self._lock: 771 correlation_id = self._protocol.send_request(request) 772 773 log.debug('%s Request %d: %s', self, correlation_id, request) 774 if request.expect_response(): 775 sent_time = time.time() 776 assert correlation_id not in self.in_flight_requests, 'Correlation ID already in-flight!' 777 self.in_flight_requests[correlation_id] = (future, sent_time) 778 else: 779 future.success(None) 780 781 # Attempt to replicate behavior from prior to introduction of 782 # send_pending_requests() / async sends 783 if blocking: 784 self.send_pending_requests() 785 786 return future 787 788 def send_pending_requests(self): 789 """Can block on network if request is larger than send_buffer_bytes""" 790 if self.state not in (ConnectionStates.AUTHENTICATING, 791 ConnectionStates.CONNECTED): 792 return Errors.NodeNotReadyError(str(self)) 793 data = self._protocol.send_bytes() 794 try: 795 # In the future we might manage an internal write buffer 796 # and send bytes asynchronously. For now, just block 797 # sending each request payload 798 total_bytes = self._send_bytes_blocking(data) 799 if self._sensors: 800 self._sensors.bytes_sent.record(total_bytes) 801 return total_bytes 802 except ConnectionError as e: 803 log.exception("Error sending request data to %s", self) 804 error = Errors.KafkaConnectionError("%s: %s" % (self, e)) 805 self.close(error=error) 806 return error 807 808 def can_send_more(self): 809 """Return True unless there are max_in_flight_requests_per_connection.""" 810 max_ifrs = self.config['max_in_flight_requests_per_connection'] 811 return len(self.in_flight_requests) < max_ifrs 812 813 def recv(self): 814 """Non-blocking network receive. 815 816 Return list of (response, future) tuples 817 """ 818 if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING: 819 log.warning('%s cannot recv: socket not connected', self) 820 # If requests are pending, we should close the socket and 821 # fail all the pending request futures 822 if self.in_flight_requests: 823 self.close(Errors.KafkaConnectionError('Socket not connected during recv with in-flight-requests')) 824 return () 825 826 elif not self.in_flight_requests: 827 log.warning('%s: No in-flight-requests to recv', self) 828 return () 829 830 responses = self._recv() 831 if not responses and self.requests_timed_out(): 832 log.warning('%s timed out after %s ms. Closing connection.', 833 self, self.config['request_timeout_ms']) 834 self.close(error=Errors.RequestTimedOutError( 835 'Request timed out after %s ms' % 836 self.config['request_timeout_ms'])) 837 return () 838 839 # augment respones w/ correlation_id, future, and timestamp 840 for i, (correlation_id, response) in enumerate(responses): 841 try: 842 (future, timestamp) = self.in_flight_requests.pop(correlation_id) 843 except KeyError: 844 self.close(Errors.KafkaConnectionError('Received unrecognized correlation id')) 845 return () 846 latency_ms = (time.time() - timestamp) * 1000 847 if self._sensors: 848 self._sensors.request_time.record(latency_ms) 849 850 log.debug('%s Response %d (%s ms): %s', self, correlation_id, latency_ms, response) 851 responses[i] = (response, future) 852 853 return responses 854 855 def _recv(self): 856 """Take all available bytes from socket, return list of any responses from parser""" 857 recvd = [] 858 while len(recvd) < self.config['sock_chunk_buffer_count']: 859 try: 860 data = self._sock.recv(self.config['sock_chunk_bytes']) 861 # We expect socket.recv to raise an exception if there are no 862 # bytes available to read from the socket in non-blocking mode. 863 # but if the socket is disconnected, we will get empty data 864 # without an exception raised 865 if not data: 866 log.error('%s: socket disconnected', self) 867 self.close(error=Errors.KafkaConnectionError('socket disconnected')) 868 return [] 869 else: 870 recvd.append(data) 871 872 except SSLWantReadError: 873 break 874 except ConnectionError as e: 875 if six.PY2 and e.errno == errno.EWOULDBLOCK: 876 break 877 log.exception('%s: Error receiving network data' 878 ' closing socket', self) 879 self.close(error=Errors.KafkaConnectionError(e)) 880 return [] 881 except BlockingIOError: 882 if six.PY3: 883 break 884 raise 885 886 recvd_data = b''.join(recvd) 887 if self._sensors: 888 self._sensors.bytes_received.record(len(recvd_data)) 889 890 try: 891 responses = self._protocol.receive_bytes(recvd_data) 892 except Errors.KafkaProtocolError as e: 893 self.close(e) 894 return [] 895 else: 896 return responses 897 898 def requests_timed_out(self): 899 if self.in_flight_requests: 900 get_timestamp = lambda v: v[1] 901 oldest_at = min(map(get_timestamp, 902 self.in_flight_requests.values())) 903 timeout = self.config['request_timeout_ms'] / 1000.0 904 if time.time() >= oldest_at + timeout: 905 return True 906 return False 907 908 def _handle_api_version_response(self, response): 909 error_type = Errors.for_code(response.error_code) 910 assert error_type is Errors.NoError, "API version check failed" 911 self._api_versions = dict([ 912 (api_key, (min_version, max_version)) 913 for api_key, min_version, max_version in response.api_versions 914 ]) 915 return self._api_versions 916 917 def get_api_versions(self): 918 if self._api_versions is not None: 919 return self._api_versions 920 921 version = self.check_version() 922 if version < (0, 10, 0): 923 raise Errors.UnsupportedVersionError( 924 "ApiVersion not supported by cluster version {} < 0.10.0" 925 .format(version)) 926 # _api_versions is set as a side effect of check_versions() on a cluster 927 # that supports 0.10.0 or later 928 return self._api_versions 929 930 def _infer_broker_version_from_api_versions(self, api_versions): 931 # The logic here is to check the list of supported request versions 932 # in reverse order. As soon as we find one that works, return it 933 test_cases = [ 934 # format (<broker version>, <needed struct>) 935 ((1, 0, 0), MetadataRequest[5]), 936 ((0, 11, 0), MetadataRequest[4]), 937 ((0, 10, 2), OffsetFetchRequest[2]), 938 ((0, 10, 1), MetadataRequest[2]), 939 ] 940 941 # Get the best match of test cases 942 for broker_version, struct in sorted(test_cases, reverse=True): 943 if struct.API_KEY not in api_versions: 944 continue 945 min_version, max_version = api_versions[struct.API_KEY] 946 if min_version <= struct.API_VERSION <= max_version: 947 return broker_version 948 949 # We know that ApiVersionResponse is only supported in 0.10+ 950 # so if all else fails, choose that 951 return (0, 10, 0) 952 953 def check_version(self, timeout=2, strict=False, topics=[]): 954 """Attempt to guess the broker version. 955 956 Note: This is a blocking call. 957 958 Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ... 959 """ 960 timeout_at = time.time() + timeout 961 log.info('Probing node %s broker version', self.node_id) 962 # Monkeypatch some connection configurations to avoid timeouts 963 override_config = { 964 'request_timeout_ms': timeout * 1000, 965 'max_in_flight_requests_per_connection': 5 966 } 967 stashed = {} 968 for key in override_config: 969 stashed[key] = self.config[key] 970 self.config[key] = override_config[key] 971 972 # kafka kills the connection when it doesn't recognize an API request 973 # so we can send a test request and then follow immediately with a 974 # vanilla MetadataRequest. If the server did not recognize the first 975 # request, both will be failed with a ConnectionError that wraps 976 # socket.error (32, 54, or 104) 977 from kafka.protocol.admin import ApiVersionRequest, ListGroupsRequest 978 from kafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest 979 980 test_cases = [ 981 # All cases starting from 0.10 will be based on ApiVersionResponse 982 ((0, 10), ApiVersionRequest[0]()), 983 ((0, 9), ListGroupsRequest[0]()), 984 ((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')), 985 ((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])), 986 ((0, 8, 0), MetadataRequest[0](topics)), 987 ] 988 989 for version, request in test_cases: 990 if not self.connect_blocking(timeout_at - time.time()): 991 raise Errors.NodeNotReadyError() 992 f = self.send(request) 993 # HACK: sleeping to wait for socket to send bytes 994 time.sleep(0.1) 995 # when broker receives an unrecognized request API 996 # it abruptly closes our socket. 997 # so we attempt to send a second request immediately 998 # that we believe it will definitely recognize (metadata) 999 # the attempt to write to a disconnected socket should 1000 # immediately fail and allow us to infer that the prior 1001 # request was unrecognized 1002 mr = self.send(MetadataRequest[0](topics)) 1003 1004 selector = self.config['selector']() 1005 selector.register(self._sock, selectors.EVENT_READ) 1006 while not (f.is_done and mr.is_done): 1007 selector.select(1) 1008 for response, future in self.recv(): 1009 future.success(response) 1010 selector.close() 1011 1012 if f.succeeded(): 1013 if isinstance(request, ApiVersionRequest[0]): 1014 # Starting from 0.10 kafka broker we determine version 1015 # by looking at ApiVersionResponse 1016 api_versions = self._handle_api_version_response(f.value) 1017 version = self._infer_broker_version_from_api_versions(api_versions) 1018 log.info('Broker version identifed as %s', '.'.join(map(str, version))) 1019 log.info('Set configuration api_version=%s to skip auto' 1020 ' check_version requests on startup', version) 1021 break 1022 1023 # Only enable strict checking to verify that we understand failure 1024 # modes. For most users, the fact that the request failed should be 1025 # enough to rule out a particular broker version. 1026 if strict: 1027 # If the socket flush hack did not work (which should force the 1028 # connection to close and fail all pending requests), then we 1029 # get a basic Request Timeout. This is not ideal, but we'll deal 1030 if isinstance(f.exception, Errors.RequestTimedOutError): 1031 pass 1032 1033 # 0.9 brokers do not close the socket on unrecognized api 1034 # requests (bug...). In this case we expect to see a correlation 1035 # id mismatch 1036 elif (isinstance(f.exception, Errors.CorrelationIdError) and 1037 version == (0, 10)): 1038 pass 1039 elif six.PY2: 1040 assert isinstance(f.exception.args[0], socket.error) 1041 assert f.exception.args[0].errno in (32, 54, 104) 1042 else: 1043 assert isinstance(f.exception.args[0], ConnectionError) 1044 log.info("Broker is not v%s -- it did not recognize %s", 1045 version, request.__class__.__name__) 1046 else: 1047 raise Errors.UnrecognizedBrokerVersion() 1048 1049 for key in stashed: 1050 self.config[key] = stashed[key] 1051 return version 1052 1053 def __str__(self): 1054 return "<BrokerConnection node_id=%s host=%s:%d %s [%s %s]>" % ( 1055 self.node_id, self.host, self.port, self.state, 1056 AFI_NAMES[self._sock_afi], self._sock_addr) 1057 1058 1059class BrokerConnectionMetrics(object): 1060 def __init__(self, metrics, metric_group_prefix, node_id): 1061 self.metrics = metrics 1062 1063 # Any broker may have registered summary metrics already 1064 # but if not, we need to create them so we can set as parents below 1065 all_conns_transferred = metrics.get_sensor('bytes-sent-received') 1066 if not all_conns_transferred: 1067 metric_group_name = metric_group_prefix + '-metrics' 1068 1069 bytes_transferred = metrics.sensor('bytes-sent-received') 1070 bytes_transferred.add(metrics.metric_name( 1071 'network-io-rate', metric_group_name, 1072 'The average number of network operations (reads or writes) on all' 1073 ' connections per second.'), Rate(sampled_stat=Count())) 1074 1075 bytes_sent = metrics.sensor('bytes-sent', 1076 parents=[bytes_transferred]) 1077 bytes_sent.add(metrics.metric_name( 1078 'outgoing-byte-rate', metric_group_name, 1079 'The average number of outgoing bytes sent per second to all' 1080 ' servers.'), Rate()) 1081 bytes_sent.add(metrics.metric_name( 1082 'request-rate', metric_group_name, 1083 'The average number of requests sent per second.'), 1084 Rate(sampled_stat=Count())) 1085 bytes_sent.add(metrics.metric_name( 1086 'request-size-avg', metric_group_name, 1087 'The average size of all requests in the window.'), Avg()) 1088 bytes_sent.add(metrics.metric_name( 1089 'request-size-max', metric_group_name, 1090 'The maximum size of any request sent in the window.'), Max()) 1091 1092 bytes_received = metrics.sensor('bytes-received', 1093 parents=[bytes_transferred]) 1094 bytes_received.add(metrics.metric_name( 1095 'incoming-byte-rate', metric_group_name, 1096 'Bytes/second read off all sockets'), Rate()) 1097 bytes_received.add(metrics.metric_name( 1098 'response-rate', metric_group_name, 1099 'Responses received sent per second.'), 1100 Rate(sampled_stat=Count())) 1101 1102 request_latency = metrics.sensor('request-latency') 1103 request_latency.add(metrics.metric_name( 1104 'request-latency-avg', metric_group_name, 1105 'The average request latency in ms.'), 1106 Avg()) 1107 request_latency.add(metrics.metric_name( 1108 'request-latency-max', metric_group_name, 1109 'The maximum request latency in ms.'), 1110 Max()) 1111 1112 # if one sensor of the metrics has been registered for the connection, 1113 # then all other sensors should have been registered; and vice versa 1114 node_str = 'node-{0}'.format(node_id) 1115 node_sensor = metrics.get_sensor(node_str + '.bytes-sent') 1116 if not node_sensor: 1117 metric_group_name = metric_group_prefix + '-node-metrics.' + node_str 1118 1119 bytes_sent = metrics.sensor( 1120 node_str + '.bytes-sent', 1121 parents=[metrics.get_sensor('bytes-sent')]) 1122 bytes_sent.add(metrics.metric_name( 1123 'outgoing-byte-rate', metric_group_name, 1124 'The average number of outgoing bytes sent per second.'), 1125 Rate()) 1126 bytes_sent.add(metrics.metric_name( 1127 'request-rate', metric_group_name, 1128 'The average number of requests sent per second.'), 1129 Rate(sampled_stat=Count())) 1130 bytes_sent.add(metrics.metric_name( 1131 'request-size-avg', metric_group_name, 1132 'The average size of all requests in the window.'), 1133 Avg()) 1134 bytes_sent.add(metrics.metric_name( 1135 'request-size-max', metric_group_name, 1136 'The maximum size of any request sent in the window.'), 1137 Max()) 1138 1139 bytes_received = metrics.sensor( 1140 node_str + '.bytes-received', 1141 parents=[metrics.get_sensor('bytes-received')]) 1142 bytes_received.add(metrics.metric_name( 1143 'incoming-byte-rate', metric_group_name, 1144 'Bytes/second read off node-connection socket'), 1145 Rate()) 1146 bytes_received.add(metrics.metric_name( 1147 'response-rate', metric_group_name, 1148 'The average number of responses received per second.'), 1149 Rate(sampled_stat=Count())) 1150 1151 request_time = metrics.sensor( 1152 node_str + '.latency', 1153 parents=[metrics.get_sensor('request-latency')]) 1154 request_time.add(metrics.metric_name( 1155 'request-latency-avg', metric_group_name, 1156 'The average request latency in ms.'), 1157 Avg()) 1158 request_time.add(metrics.metric_name( 1159 'request-latency-max', metric_group_name, 1160 'The maximum request latency in ms.'), 1161 Max()) 1162 1163 self.bytes_sent = metrics.sensor(node_str + '.bytes-sent') 1164 self.bytes_received = metrics.sensor(node_str + '.bytes-received') 1165 self.request_time = metrics.sensor(node_str + '.latency') 1166 1167 1168def _address_family(address): 1169 """ 1170 Attempt to determine the family of an address (or hostname) 1171 1172 :return: either socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC if the address family 1173 could not be determined 1174 """ 1175 if address.startswith('[') and address.endswith(']'): 1176 return socket.AF_INET6 1177 for af in (socket.AF_INET, socket.AF_INET6): 1178 try: 1179 socket.inet_pton(af, address) 1180 return af 1181 except (ValueError, AttributeError, socket.error): 1182 continue 1183 return socket.AF_UNSPEC 1184 1185 1186def get_ip_port_afi(host_and_port_str): 1187 """ 1188 Parse the IP and port from a string in the format of: 1189 1190 * host_or_ip <- Can be either IPv4 address literal or hostname/fqdn 1191 * host_or_ipv4:port <- Can be either IPv4 address literal or hostname/fqdn 1192 * [host_or_ip] <- IPv6 address literal 1193 * [host_or_ip]:port. <- IPv6 address literal 1194 1195 .. note:: IPv6 address literals with ports *must* be enclosed in brackets 1196 1197 .. note:: If the port is not specified, default will be returned. 1198 1199 :return: tuple (host, port, afi), afi will be socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC 1200 """ 1201 host_and_port_str = host_and_port_str.strip() 1202 if host_and_port_str.startswith('['): 1203 af = socket.AF_INET6 1204 host, rest = host_and_port_str[1:].split(']') 1205 if rest: 1206 port = int(rest[1:]) 1207 else: 1208 port = DEFAULT_KAFKA_PORT 1209 return host, port, af 1210 else: 1211 if ':' not in host_and_port_str: 1212 af = _address_family(host_and_port_str) 1213 return host_and_port_str, DEFAULT_KAFKA_PORT, af 1214 else: 1215 # now we have something with a colon in it and no square brackets. It could be 1216 # either an IPv6 address literal (e.g., "::1") or an IP:port pair or a host:port pair 1217 try: 1218 # if it decodes as an IPv6 address, use that 1219 socket.inet_pton(socket.AF_INET6, host_and_port_str) 1220 return host_and_port_str, DEFAULT_KAFKA_PORT, socket.AF_INET6 1221 except AttributeError: 1222 log.warning('socket.inet_pton not available on this platform.' 1223 ' consider `pip install win_inet_pton`') 1224 pass 1225 except (ValueError, socket.error): 1226 # it's a host:port pair 1227 pass 1228 host, port = host_and_port_str.rsplit(':', 1) 1229 port = int(port) 1230 1231 af = _address_family(host) 1232 return host, port, af 1233 1234 1235def collect_hosts(hosts, randomize=True): 1236 """ 1237 Collects a comma-separated set of hosts (host:port) and optionally 1238 randomize the returned list. 1239 """ 1240 1241 if isinstance(hosts, six.string_types): 1242 hosts = hosts.strip().split(',') 1243 1244 result = [] 1245 afi = socket.AF_INET 1246 for host_port in hosts: 1247 1248 host, port, afi = get_ip_port_afi(host_port) 1249 1250 if port < 0: 1251 port = DEFAULT_KAFKA_PORT 1252 1253 result.append((host, port, afi)) 1254 1255 if randomize: 1256 shuffle(result) 1257 1258 return result 1259 1260 1261def is_inet_4_or_6(gai): 1262 """Given a getaddrinfo struct, return True iff ipv4 or ipv6""" 1263 return gai[0] in (socket.AF_INET, socket.AF_INET6) 1264 1265 1266def dns_lookup(host, port, afi=socket.AF_UNSPEC): 1267 """Returns a list of getaddrinfo structs, optionally filtered to an afi (ipv4 / ipv6)""" 1268 # XXX: all DNS functions in Python are blocking. If we really 1269 # want to be non-blocking here, we need to use a 3rd-party 1270 # library like python-adns, or move resolution onto its 1271 # own thread. This will be subject to the default libc 1272 # name resolution timeout (5s on most Linux boxes) 1273 try: 1274 return list(filter(is_inet_4_or_6, 1275 socket.getaddrinfo(host, port, afi, 1276 socket.SOCK_STREAM))) 1277 except socket.gaierror as ex: 1278 log.warning('DNS lookup failed for %s:%d,' 1279 ' exception was %s. Is your' 1280 ' advertised.listeners (called' 1281 ' advertised.host.name before Kafka 9)' 1282 ' correct and resolvable?', 1283 host, port, ex) 1284 return [] 1285