1from __future__ import absolute_import, division
2
3import collections
4import copy
5import errno
6import io
7import logging
8from random import shuffle, uniform
9
10# selectors in stdlib as of py3.4
11try:
12    import selectors  # pylint: disable=import-error
13except ImportError:
14    # vendored backport module
15    from kafka.vendor import selectors34 as selectors
16
17import socket
18import struct
19import sys
20import threading
21import time
22
23from kafka.vendor import six
24
25import kafka.errors as Errors
26from kafka.future import Future
27from kafka.metrics.stats import Avg, Count, Max, Rate
28from kafka.protocol.admin import SaslHandShakeRequest
29from kafka.protocol.commit import OffsetFetchRequest
30from kafka.protocol.metadata import MetadataRequest
31from kafka.protocol.parser import KafkaProtocol
32from kafka.protocol.types import Int32, Int8
33from kafka.version import __version__
34
35
36if six.PY2:
37    ConnectionError = socket.error
38    BlockingIOError = Exception
39
40log = logging.getLogger(__name__)
41
42DEFAULT_KAFKA_PORT = 9092
43
44SASL_QOP_AUTH = 1
45SASL_QOP_AUTH_INT = 2
46SASL_QOP_AUTH_CONF = 4
47
48try:
49    import ssl
50    ssl_available = True
51    try:
52        SSLEOFError = ssl.SSLEOFError
53        SSLWantReadError = ssl.SSLWantReadError
54        SSLWantWriteError = ssl.SSLWantWriteError
55        SSLZeroReturnError = ssl.SSLZeroReturnError
56    except AttributeError:
57        # support older ssl libraries
58        log.warning('Old SSL module detected.'
59                    ' SSL error handling may not operate cleanly.'
60                    ' Consider upgrading to Python 3.3 or 2.7.9')
61        SSLEOFError = ssl.SSLError
62        SSLWantReadError = ssl.SSLError
63        SSLWantWriteError = ssl.SSLError
64        SSLZeroReturnError = ssl.SSLError
65except ImportError:
66    # support Python without ssl libraries
67    ssl_available = False
68    class SSLWantReadError(Exception):
69        pass
70    class SSLWantWriteError(Exception):
71        pass
72
73# needed for SASL_GSSAPI authentication:
74try:
75    import gssapi
76    from gssapi.raw.misc import GSSError
77except ImportError:
78    #no gssapi available, will disable gssapi mechanism
79    gssapi = None
80    GSSError = None
81
82
83AFI_NAMES = {
84    socket.AF_UNSPEC: "unspecified",
85    socket.AF_INET: "IPv4",
86    socket.AF_INET6: "IPv6",
87}
88
89
90class ConnectionStates(object):
91    DISCONNECTING = '<disconnecting>'
92    DISCONNECTED = '<disconnected>'
93    CONNECTING = '<connecting>'
94    HANDSHAKE = '<handshake>'
95    CONNECTED = '<connected>'
96    AUTHENTICATING = '<authenticating>'
97
98
99class BrokerConnection(object):
100    """Initialize a Kafka broker connection
101
102    Keyword Arguments:
103        client_id (str): a name for this client. This string is passed in
104            each request to servers and can be used to identify specific
105            server-side log entries that correspond to this client. Also
106            submitted to GroupCoordinator for logging with respect to
107            consumer group administration. Default: 'kafka-python-{version}'
108        reconnect_backoff_ms (int): The amount of time in milliseconds to
109            wait before attempting to reconnect to a given host.
110            Default: 50.
111        reconnect_backoff_max_ms (int): The maximum amount of time in
112            milliseconds to wait when reconnecting to a broker that has
113            repeatedly failed to connect. If provided, the backoff per host
114            will increase exponentially for each consecutive connection
115            failure, up to this maximum. To avoid connection storms, a
116            randomization factor of 0.2 will be applied to the backoff
117            resulting in a random range between 20% below and 20% above
118            the computed value. Default: 1000.
119        request_timeout_ms (int): Client request timeout in milliseconds.
120            Default: 30000.
121        max_in_flight_requests_per_connection (int): Requests are pipelined
122            to kafka brokers up to this number of maximum requests per
123            broker connection. Default: 5.
124        receive_buffer_bytes (int): The size of the TCP receive buffer
125            (SO_RCVBUF) to use when reading data. Default: None (relies on
126            system defaults). Java client defaults to 32768.
127        send_buffer_bytes (int): The size of the TCP send buffer
128            (SO_SNDBUF) to use when sending data. Default: None (relies on
129            system defaults). Java client defaults to 131072.
130        socket_options (list): List of tuple-arguments to socket.setsockopt
131            to apply to broker connection sockets. Default:
132            [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
133        security_protocol (str): Protocol used to communicate with brokers.
134            Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
135            Default: PLAINTEXT.
136        ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
137            socket connections. If provided, all other ssl_* configurations
138            will be ignored. Default: None.
139        ssl_check_hostname (bool): flag to configure whether ssl handshake
140            should verify that the certificate matches the brokers hostname.
141            default: True.
142        ssl_cafile (str): optional filename of ca file to use in certificate
143            veriication. default: None.
144        ssl_certfile (str): optional filename of file in pem format containing
145            the client certificate, as well as any ca certificates needed to
146            establish the certificate's authenticity. default: None.
147        ssl_keyfile (str): optional filename containing the client private key.
148            default: None.
149        ssl_password (callable, str, bytes, bytearray): optional password or
150            callable function that returns a password, for decrypting the
151            client private key. Default: None.
152        ssl_crlfile (str): optional filename containing the CRL to check for
153            certificate expiration. By default, no CRL check is done. When
154            providing a file, only the leaf certificate will be checked against
155            this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
156            default: None.
157        api_version (tuple): Specify which Kafka API version to use.
158            Accepted values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9),
159            (0, 10). Default: (0, 8, 2)
160        api_version_auto_timeout_ms (int): number of milliseconds to throw a
161            timeout exception from the constructor when checking the broker
162            api version. Only applies if api_version is None
163        selector (selectors.BaseSelector): Provide a specific selector
164            implementation to use for I/O multiplexing.
165            Default: selectors.DefaultSelector
166        state_change_callback (callable): function to be called when the
167            connection state changes from CONNECTING to CONNECTED etc.
168        metrics (kafka.metrics.Metrics): Optionally provide a metrics
169            instance for capturing network IO stats. Default: None.
170        metric_group_prefix (str): Prefix for metric names. Default: ''
171        sasl_mechanism (str): Authentication mechanism when security_protocol
172            is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
173            PLAIN, GSSAPI. Default: PLAIN
174        sasl_plain_username (str): username for sasl PLAIN authentication.
175            Default: None
176        sasl_plain_password (str): password for sasl PLAIN authentication.
177            Default: None
178        sasl_kerberos_service_name (str): Service name to include in GSSAPI
179            sasl mechanism handshake. Default: 'kafka'
180        sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
181            sasl mechanism handshake. Default: one of bootstrap servers
182    """
183
184    DEFAULT_CONFIG = {
185        'client_id': 'kafka-python-' + __version__,
186        'node_id': 0,
187        'request_timeout_ms': 30000,
188        'reconnect_backoff_ms': 50,
189        'reconnect_backoff_max_ms': 1000,
190        'max_in_flight_requests_per_connection': 5,
191        'receive_buffer_bytes': None,
192        'send_buffer_bytes': None,
193        'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
194        'sock_chunk_bytes': 4096,  # undocumented experimental option
195        'sock_chunk_buffer_count': 1000,  # undocumented experimental option
196        'security_protocol': 'PLAINTEXT',
197        'ssl_context': None,
198        'ssl_check_hostname': True,
199        'ssl_cafile': None,
200        'ssl_certfile': None,
201        'ssl_keyfile': None,
202        'ssl_crlfile': None,
203        'ssl_password': None,
204        'api_version': (0, 8, 2),  # default to most restrictive
205        'selector': selectors.DefaultSelector,
206        'state_change_callback': lambda conn: True,
207        'metrics': None,
208        'metric_group_prefix': '',
209        'sasl_mechanism': 'PLAIN',
210        'sasl_plain_username': None,
211        'sasl_plain_password': None,
212        'sasl_kerberos_service_name': 'kafka',
213        'sasl_kerberos_domain_name': None
214    }
215    SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
216    SASL_MECHANISMS = ('PLAIN', 'GSSAPI')
217
218    def __init__(self, host, port, afi, **configs):
219        self.host = host
220        self.port = port
221        self.afi = afi
222        self._sock_afi = afi
223        self._sock_addr = None
224        self._api_versions = None
225
226        self.config = copy.copy(self.DEFAULT_CONFIG)
227        for key in self.config:
228            if key in configs:
229                self.config[key] = configs[key]
230
231        self.node_id = self.config.pop('node_id')
232
233        if self.config['api_version'] is None:
234            self.config['api_version'] = self.DEFAULT_CONFIG['api_version']
235
236        if self.config['receive_buffer_bytes'] is not None:
237            self.config['socket_options'].append(
238                (socket.SOL_SOCKET, socket.SO_RCVBUF,
239                 self.config['receive_buffer_bytes']))
240        if self.config['send_buffer_bytes'] is not None:
241            self.config['socket_options'].append(
242                 (socket.SOL_SOCKET, socket.SO_SNDBUF,
243                 self.config['send_buffer_bytes']))
244
245        assert self.config['security_protocol'] in self.SECURITY_PROTOCOLS, (
246            'security_protcol must be in ' + ', '.join(self.SECURITY_PROTOCOLS))
247
248        if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
249            assert ssl_available, "Python wasn't built with SSL support"
250
251        if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
252            assert self.config['sasl_mechanism'] in self.SASL_MECHANISMS, (
253                'sasl_mechanism must be in ' + ', '.join(self.SASL_MECHANISMS))
254            if self.config['sasl_mechanism'] == 'PLAIN':
255                assert self.config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl'
256                assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
257            if self.config['sasl_mechanism'] == 'GSSAPI':
258                assert gssapi is not None, 'GSSAPI lib not available'
259                assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
260
261        # This is not a general lock / this class is not generally thread-safe yet
262        # However, to avoid pushing responsibility for maintaining
263        # per-connection locks to the upstream client, we will use this lock to
264        # make sure that access to the protocol buffer is synchronized
265        # when sends happen on multiple threads
266        self._lock = threading.Lock()
267
268        # the protocol parser instance manages actual tracking of the
269        # sequence of in-flight requests to responses, which should
270        # function like a FIFO queue. For additional request data,
271        # including tracking request futures and timestamps, we
272        # can use a simple dictionary of correlation_id => request data
273        self.in_flight_requests = dict()
274
275        self._protocol = KafkaProtocol(
276            client_id=self.config['client_id'],
277            api_version=self.config['api_version'])
278        self.state = ConnectionStates.DISCONNECTED
279        self._reset_reconnect_backoff()
280        self._sock = None
281        self._ssl_context = None
282        if self.config['ssl_context'] is not None:
283            self._ssl_context = self.config['ssl_context']
284        self._sasl_auth_future = None
285        self.last_attempt = 0
286        self._gai = []
287        self._sensors = None
288        if self.config['metrics']:
289            self._sensors = BrokerConnectionMetrics(self.config['metrics'],
290                                                    self.config['metric_group_prefix'],
291                                                    self.node_id)
292
293    def _dns_lookup(self):
294        self._gai = dns_lookup(self.host, self.port, self.afi)
295        if not self._gai:
296            log.error('DNS lookup failed for %s:%i (%s)',
297                      self.host, self.port, self.afi)
298            return False
299        return True
300
301    def _next_afi_sockaddr(self):
302        if not self._gai:
303            if not self._dns_lookup():
304                return
305        afi, _, __, ___, sockaddr = self._gai.pop(0)
306        return (afi, sockaddr)
307
308    def connect_blocking(self, timeout=float('inf')):
309        if self.connected():
310            return True
311        timeout += time.time()
312        # First attempt to perform dns lookup
313        # note that the underlying interface, socket.getaddrinfo,
314        # has no explicit timeout so we may exceed the user-specified timeout
315        self._dns_lookup()
316
317        # Loop once over all returned dns entries
318        selector = None
319        while self._gai:
320            while time.time() < timeout:
321                self.connect()
322                if self.connected():
323                    if selector is not None:
324                        selector.close()
325                    return True
326                elif self.connecting():
327                    if selector is None:
328                        selector = self.config['selector']()
329                        selector.register(self._sock, selectors.EVENT_WRITE)
330                    selector.select(1)
331                elif self.disconnected():
332                    if selector is not None:
333                        selector.close()
334                        selector = None
335                    break
336            else:
337                break
338        return False
339
340    def connect(self):
341        """Attempt to connect and return ConnectionState"""
342        if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out():
343            self.last_attempt = time.time()
344            next_lookup = self._next_afi_sockaddr()
345            if not next_lookup:
346                self.close(Errors.KafkaConnectionError('DNS failure'))
347                return
348            else:
349                log.debug('%s: creating new socket', self)
350                self._sock_afi, self._sock_addr = next_lookup
351                self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
352
353            for option in self.config['socket_options']:
354                log.debug('%s: setting socket option %s', self, option)
355                self._sock.setsockopt(*option)
356
357            self._sock.setblocking(False)
358            self.state = ConnectionStates.CONNECTING
359            if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
360                self._wrap_ssl()
361            # _wrap_ssl can alter the connection state -- disconnects on failure
362            # so we need to double check that we are still connecting before
363            if self.connecting():
364                self.config['state_change_callback'](self)
365                log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
366                         self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
367
368        if self.state is ConnectionStates.CONNECTING:
369            # in non-blocking mode, use repeated calls to socket.connect_ex
370            # to check connection status
371            ret = None
372            try:
373                ret = self._sock.connect_ex(self._sock_addr)
374            except socket.error as err:
375                ret = err.errno
376            except ValueError as err:
377                # Python 3.7 and higher raises ValueError if a socket
378                # is already connected
379                if sys.version_info >= (3, 7):
380                    ret = None
381                else:
382                    raise
383
384            # Connection succeeded
385            if not ret or ret == errno.EISCONN:
386                log.debug('%s: established TCP connection', self)
387                if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
388                    log.debug('%s: initiating SSL handshake', self)
389                    self.state = ConnectionStates.HANDSHAKE
390                elif self.config['security_protocol'] == 'SASL_PLAINTEXT':
391                    log.debug('%s: initiating SASL authentication', self)
392                    self.state = ConnectionStates.AUTHENTICATING
393                else:
394                    # security_protocol PLAINTEXT
395                    log.info('%s: Connection complete.', self)
396                    self.state = ConnectionStates.CONNECTED
397                    self._reset_reconnect_backoff()
398                self.config['state_change_callback'](self)
399
400            # Connection failed
401            # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
402            elif ret not in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022):
403                log.error('Connect attempt to %s returned error %s.'
404                          ' Disconnecting.', self, ret)
405                errstr = errno.errorcode.get(ret, 'UNKNOWN')
406                self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr)))
407
408            # Needs retry
409            else:
410                pass
411
412        if self.state is ConnectionStates.HANDSHAKE:
413            if self._try_handshake():
414                log.debug('%s: completed SSL handshake.', self)
415                if self.config['security_protocol'] == 'SASL_SSL':
416                    log.debug('%s: initiating SASL authentication', self)
417                    self.state = ConnectionStates.AUTHENTICATING
418                else:
419                    log.info('%s: Connection complete.', self)
420                    self.state = ConnectionStates.CONNECTED
421                self.config['state_change_callback'](self)
422
423        if self.state is ConnectionStates.AUTHENTICATING:
424            assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
425            if self._try_authenticate():
426                # _try_authenticate has side-effects: possibly disconnected on socket errors
427                if self.state is ConnectionStates.AUTHENTICATING:
428                    log.info('%s: Connection complete.', self)
429                    self.state = ConnectionStates.CONNECTED
430                    self._reset_reconnect_backoff()
431                    self.config['state_change_callback'](self)
432
433        if self.state not in (ConnectionStates.CONNECTED,
434                              ConnectionStates.DISCONNECTED):
435            # Connection timed out
436            request_timeout = self.config['request_timeout_ms'] / 1000.0
437            if time.time() > request_timeout + self.last_attempt:
438                log.error('Connection attempt to %s timed out', self)
439                self.close(Errors.KafkaConnectionError('timeout'))
440
441        return self.state
442
443    def _wrap_ssl(self):
444        assert self.config['security_protocol'] in ('SSL', 'SASL_SSL')
445        if self._ssl_context is None:
446            log.debug('%s: configuring default SSL Context', self)
447            self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)  # pylint: disable=no-member
448            self._ssl_context.options |= ssl.OP_NO_SSLv2  # pylint: disable=no-member
449            self._ssl_context.options |= ssl.OP_NO_SSLv3  # pylint: disable=no-member
450            self._ssl_context.verify_mode = ssl.CERT_OPTIONAL
451            if self.config['ssl_check_hostname']:
452                self._ssl_context.check_hostname = True
453            if self.config['ssl_cafile']:
454                log.info('%s: Loading SSL CA from %s', self, self.config['ssl_cafile'])
455                self._ssl_context.load_verify_locations(self.config['ssl_cafile'])
456                self._ssl_context.verify_mode = ssl.CERT_REQUIRED
457            if self.config['ssl_certfile'] and self.config['ssl_keyfile']:
458                log.info('%s: Loading SSL Cert from %s', self, self.config['ssl_certfile'])
459                log.info('%s: Loading SSL Key from %s', self, self.config['ssl_keyfile'])
460                self._ssl_context.load_cert_chain(
461                    certfile=self.config['ssl_certfile'],
462                    keyfile=self.config['ssl_keyfile'],
463                    password=self.config['ssl_password'])
464            if self.config['ssl_crlfile']:
465                if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'):
466                    raise RuntimeError('This version of Python does not support ssl_crlfile!')
467                log.info('%s: Loading SSL CRL from %s', self, self.config['ssl_crlfile'])
468                self._ssl_context.load_verify_locations(self.config['ssl_crlfile'])
469                # pylint: disable=no-member
470                self._ssl_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
471        log.debug('%s: wrapping socket in ssl context', self)
472        try:
473            self._sock = self._ssl_context.wrap_socket(
474                self._sock,
475                server_hostname=self.host,
476                do_handshake_on_connect=False)
477        except ssl.SSLError as e:
478            log.exception('%s: Failed to wrap socket in SSLContext!', self)
479            self.close(e)
480
481    def _try_handshake(self):
482        assert self.config['security_protocol'] in ('SSL', 'SASL_SSL')
483        try:
484            self._sock.do_handshake()
485            return True
486        # old ssl in python2.6 will swallow all SSLErrors here...
487        except (SSLWantReadError, SSLWantWriteError):
488            pass
489        # python 3.7 throws OSError
490        except OSError:
491            pass
492        except (SSLZeroReturnError, ConnectionError, SSLEOFError):
493            log.warning('SSL connection closed by server during handshake.')
494            self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))
495        # Other SSLErrors will be raised to user
496
497        return False
498
499    def _try_authenticate(self):
500        assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10)
501
502        if self._sasl_auth_future is None:
503            # Build a SaslHandShakeRequest message
504            request = SaslHandShakeRequest[0](self.config['sasl_mechanism'])
505            future = Future()
506            sasl_response = self._send(request)
507            sasl_response.add_callback(self._handle_sasl_handshake_response, future)
508            sasl_response.add_errback(lambda f, e: f.failure(e), future)
509            self._sasl_auth_future = future
510
511        for r, f in self.recv():
512            f.success(r)
513
514        # A connection error could trigger close() which will reset the future
515        if self._sasl_auth_future is None:
516            return False
517        elif self._sasl_auth_future.failed():
518            ex = self._sasl_auth_future.exception
519            if not isinstance(ex, Errors.KafkaConnectionError):
520                raise ex  # pylint: disable-msg=raising-bad-type
521        return self._sasl_auth_future.succeeded()
522
523    def _handle_sasl_handshake_response(self, future, response):
524        error_type = Errors.for_code(response.error_code)
525        if error_type is not Errors.NoError:
526            error = error_type(self)
527            self.close(error=error)
528            return future.failure(error_type(self))
529
530        if self.config['sasl_mechanism'] not in response.enabled_mechanisms:
531            return future.failure(
532                Errors.UnsupportedSaslMechanismError(
533                    'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s'
534                    % (self.config['sasl_mechanism'], response.enabled_mechanisms)))
535        elif self.config['sasl_mechanism'] == 'PLAIN':
536            return self._try_authenticate_plain(future)
537        elif self.config['sasl_mechanism'] == 'GSSAPI':
538            return self._try_authenticate_gssapi(future)
539        else:
540            return future.failure(
541                Errors.UnsupportedSaslMechanismError(
542                    'kafka-python does not support SASL mechanism %s' %
543                    self.config['sasl_mechanism']))
544
545    def _send_bytes_blocking(self, data):
546        self._sock.settimeout(self.config['request_timeout_ms'] / 1000)
547        total_sent = 0
548        try:
549            while total_sent < len(data):
550                sent_bytes = self._sock.send(data[total_sent:])
551                total_sent += sent_bytes
552            if total_sent != len(data):
553                raise ConnectionError('Buffer overrun during socket send')
554            return total_sent
555        finally:
556            self._sock.settimeout(0.0)
557
558    def _recv_bytes_blocking(self, n):
559        self._sock.settimeout(self.config['request_timeout_ms'] / 1000)
560        try:
561            data = b''
562            while len(data) < n:
563                fragment = self._sock.recv(n - len(data))
564                if not fragment:
565                    raise ConnectionError('Connection reset during recv')
566                data += fragment
567            return data
568        finally:
569            self._sock.settimeout(0.0)
570
571    def _try_authenticate_plain(self, future):
572        if self.config['security_protocol'] == 'SASL_PLAINTEXT':
573            log.warning('%s: Sending username and password in the clear', self)
574
575        data = b''
576        # Send PLAIN credentials per RFC-4616
577        msg = bytes('\0'.join([self.config['sasl_plain_username'],
578                               self.config['sasl_plain_username'],
579                               self.config['sasl_plain_password']]).encode('utf-8'))
580        size = Int32.encode(len(msg))
581        try:
582            self._send_bytes_blocking(size + msg)
583
584            # The server will send a zero sized message (that is Int32(0)) on success.
585            # The connection is closed on failure
586            data = self._recv_bytes_blocking(4)
587
588        except ConnectionError as e:
589            log.exception("%s: Error receiving reply from server", self)
590            error = Errors.KafkaConnectionError("%s: %s" % (self, e))
591            self.close(error=error)
592            return future.failure(error)
593
594        if data != b'\x00\x00\x00\x00':
595            error = Errors.AuthenticationFailedError('Unrecognized response during authentication')
596            return future.failure(error)
597
598        log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username'])
599        return future.success(True)
600
601    def _try_authenticate_gssapi(self, future):
602        kerberos_damin_name = self.config['sasl_kerberos_domain_name'] or self.host
603        auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name
604        gssapi_name = gssapi.Name(
605            auth_id,
606            name_type=gssapi.NameType.hostbased_service
607        ).canonicalize(gssapi.MechType.kerberos)
608        log.debug('%s: GSSAPI name: %s', self, gssapi_name)
609
610        # Establish security context and negotiate protection level
611        # For reference RFC 2222, section 7.2.1
612        try:
613            # Exchange tokens until authentication either succeeds or fails
614            client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate')
615            received_token = None
616            while not client_ctx.complete:
617                # calculate an output token from kafka token (or None if first iteration)
618                output_token = client_ctx.step(received_token)
619
620                # pass output token to kafka, or send empty response if the security
621                # context is complete (output token is None in that case)
622                if output_token is None:
623                    self._send_bytes_blocking(Int32.encode(0))
624                else:
625                    msg = output_token
626                    size = Int32.encode(len(msg))
627                    self._send_bytes_blocking(size + msg)
628
629                # The server will send a token back. Processing of this token either
630                # establishes a security context, or it needs further token exchange.
631                # The gssapi will be able to identify the needed next step.
632                # The connection is closed on failure.
633                header = self._recv_bytes_blocking(4)
634                (token_size,) = struct.unpack('>i', header)
635                received_token = self._recv_bytes_blocking(token_size)
636
637            # Process the security layer negotiation token, sent by the server
638            # once the security context is established.
639
640            # unwraps message containing supported protection levels and msg size
641            msg = client_ctx.unwrap(received_token).message
642            # Kafka currently doesn't support integrity or confidentiality security layers, so we
643            # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
644            # by the server
645            msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))) + msg[1:]
646            # add authorization identity to the response, GSS-wrap and send it
647            msg = client_ctx.wrap(msg + auth_id.encode(), False).message
648            size = Int32.encode(len(msg))
649            self._send_bytes_blocking(size + msg)
650
651        except ConnectionError as e:
652            log.exception("%s: Error receiving reply from server",  self)
653            error = Errors.KafkaConnectionError("%s: %s" % (self, e))
654            self.close(error=error)
655            return future.failure(error)
656        except Exception as e:
657            return future.failure(e)
658
659        log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name)
660        return future.success(True)
661
662    def blacked_out(self):
663        """
664        Return true if we are disconnected from the given node and can't
665        re-establish a connection yet
666        """
667        if self.state is ConnectionStates.DISCONNECTED:
668            if time.time() < self.last_attempt + self._reconnect_backoff:
669                return True
670        return False
671
672    def connection_delay(self):
673        """
674        Return the number of milliseconds to wait, based on the connection
675        state, before attempting to send data. When disconnected, this respects
676        the reconnect backoff time. When connecting, returns 0 to allow
677        non-blocking connect to finish. When connected, returns a very large
678        number to handle slow/stalled connections.
679        """
680        time_waited = time.time() - (self.last_attempt or 0)
681        if self.state is ConnectionStates.DISCONNECTED:
682            return max(self._reconnect_backoff - time_waited, 0) * 1000
683        elif self.connecting():
684            return 0
685        else:
686            return float('inf')
687
688    def connected(self):
689        """Return True iff socket is connected."""
690        return self.state is ConnectionStates.CONNECTED
691
692    def connecting(self):
693        """Returns True if still connecting (this may encompass several
694        different states, such as SSL handshake, authorization, etc)."""
695        return self.state in (ConnectionStates.CONNECTING,
696                              ConnectionStates.HANDSHAKE,
697                              ConnectionStates.AUTHENTICATING)
698
699    def disconnected(self):
700        """Return True iff socket is closed"""
701        return self.state is ConnectionStates.DISCONNECTED
702
703    def _reset_reconnect_backoff(self):
704        self._failures = 0
705        self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0
706
707    def _update_reconnect_backoff(self):
708        # Do not mark as failure if there are more dns entries available to try
709        if len(self._gai) > 0:
710            return
711        if self.config['reconnect_backoff_max_ms'] > self.config['reconnect_backoff_ms']:
712            self._failures += 1
713            self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1)
714            self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max_ms'])
715            self._reconnect_backoff *= uniform(0.8, 1.2)
716            self._reconnect_backoff /= 1000.0
717            log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures)
718
719    def _close_socket(self):
720        if self._sock:
721            self._sock.close()
722            self._sock = None
723
724    def __del__(self):
725        self._close_socket()
726
727    def close(self, error=None):
728        """Close socket and fail all in-flight-requests.
729
730        Arguments:
731            error (Exception, optional): pending in-flight-requests
732                will be failed with this exception.
733                Default: kafka.errors.KafkaConnectionError.
734        """
735        if self.state is ConnectionStates.DISCONNECTED:
736            if error is not None:
737                log.warning('%s: Duplicate close() with error: %s', self, error)
738            return
739        log.info('%s: Closing connection. %s', self, error or '')
740        self.state = ConnectionStates.DISCONNECTING
741        self.config['state_change_callback'](self)
742        self._update_reconnect_backoff()
743        self._close_socket()
744        self.state = ConnectionStates.DISCONNECTED
745        self._sasl_auth_future = None
746        self._protocol = KafkaProtocol(
747            client_id=self.config['client_id'],
748            api_version=self.config['api_version'])
749        if error is None:
750            error = Errors.Cancelled(str(self))
751        while self.in_flight_requests:
752            (_correlation_id, (future, _timestamp)) = self.in_flight_requests.popitem()
753            future.failure(error)
754        self.config['state_change_callback'](self)
755
756    def send(self, request, blocking=True):
757        """Queue request for async network send, return Future()"""
758        future = Future()
759        if self.connecting():
760            return future.failure(Errors.NodeNotReadyError(str(self)))
761        elif not self.connected():
762            return future.failure(Errors.KafkaConnectionError(str(self)))
763        elif not self.can_send_more():
764            return future.failure(Errors.TooManyInFlightRequests(str(self)))
765        return self._send(request, blocking=blocking)
766
767    def _send(self, request, blocking=True):
768        assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED)
769        future = Future()
770        with self._lock:
771            correlation_id = self._protocol.send_request(request)
772
773        log.debug('%s Request %d: %s', self, correlation_id, request)
774        if request.expect_response():
775            sent_time = time.time()
776            assert correlation_id not in self.in_flight_requests, 'Correlation ID already in-flight!'
777            self.in_flight_requests[correlation_id] = (future, sent_time)
778        else:
779            future.success(None)
780
781        # Attempt to replicate behavior from prior to introduction of
782        # send_pending_requests() / async sends
783        if blocking:
784            self.send_pending_requests()
785
786        return future
787
788    def send_pending_requests(self):
789        """Can block on network if request is larger than send_buffer_bytes"""
790        if self.state not in (ConnectionStates.AUTHENTICATING,
791                              ConnectionStates.CONNECTED):
792            return Errors.NodeNotReadyError(str(self))
793        data = self._protocol.send_bytes()
794        try:
795            # In the future we might manage an internal write buffer
796            # and send bytes asynchronously. For now, just block
797            # sending each request payload
798            total_bytes = self._send_bytes_blocking(data)
799            if self._sensors:
800                self._sensors.bytes_sent.record(total_bytes)
801            return total_bytes
802        except ConnectionError as e:
803            log.exception("Error sending request data to %s", self)
804            error = Errors.KafkaConnectionError("%s: %s" % (self, e))
805            self.close(error=error)
806            return error
807
808    def can_send_more(self):
809        """Return True unless there are max_in_flight_requests_per_connection."""
810        max_ifrs = self.config['max_in_flight_requests_per_connection']
811        return len(self.in_flight_requests) < max_ifrs
812
813    def recv(self):
814        """Non-blocking network receive.
815
816        Return list of (response, future) tuples
817        """
818        if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING:
819            log.warning('%s cannot recv: socket not connected', self)
820            # If requests are pending, we should close the socket and
821            # fail all the pending request futures
822            if self.in_flight_requests:
823                self.close(Errors.KafkaConnectionError('Socket not connected during recv with in-flight-requests'))
824            return ()
825
826        elif not self.in_flight_requests:
827            log.warning('%s: No in-flight-requests to recv', self)
828            return ()
829
830        responses = self._recv()
831        if not responses and self.requests_timed_out():
832            log.warning('%s timed out after %s ms. Closing connection.',
833                        self, self.config['request_timeout_ms'])
834            self.close(error=Errors.RequestTimedOutError(
835                'Request timed out after %s ms' %
836                self.config['request_timeout_ms']))
837            return ()
838
839        # augment respones w/ correlation_id, future, and timestamp
840        for i, (correlation_id, response) in enumerate(responses):
841            try:
842                (future, timestamp) = self.in_flight_requests.pop(correlation_id)
843            except KeyError:
844                self.close(Errors.KafkaConnectionError('Received unrecognized correlation id'))
845                return ()
846            latency_ms = (time.time() - timestamp) * 1000
847            if self._sensors:
848                self._sensors.request_time.record(latency_ms)
849
850            log.debug('%s Response %d (%s ms): %s', self, correlation_id, latency_ms, response)
851            responses[i] = (response, future)
852
853        return responses
854
855    def _recv(self):
856        """Take all available bytes from socket, return list of any responses from parser"""
857        recvd = []
858        while len(recvd) < self.config['sock_chunk_buffer_count']:
859            try:
860                data = self._sock.recv(self.config['sock_chunk_bytes'])
861                # We expect socket.recv to raise an exception if there are no
862                # bytes available to read from the socket in non-blocking mode.
863                # but if the socket is disconnected, we will get empty data
864                # without an exception raised
865                if not data:
866                    log.error('%s: socket disconnected', self)
867                    self.close(error=Errors.KafkaConnectionError('socket disconnected'))
868                    return []
869                else:
870                    recvd.append(data)
871
872            except SSLWantReadError:
873                break
874            except ConnectionError as e:
875                if six.PY2 and e.errno == errno.EWOULDBLOCK:
876                    break
877                log.exception('%s: Error receiving network data'
878                              ' closing socket', self)
879                self.close(error=Errors.KafkaConnectionError(e))
880                return []
881            except BlockingIOError:
882                if six.PY3:
883                    break
884                raise
885
886        recvd_data = b''.join(recvd)
887        if self._sensors:
888            self._sensors.bytes_received.record(len(recvd_data))
889
890        try:
891            responses = self._protocol.receive_bytes(recvd_data)
892        except Errors.KafkaProtocolError as e:
893            self.close(e)
894            return []
895        else:
896            return responses
897
898    def requests_timed_out(self):
899        if self.in_flight_requests:
900            get_timestamp = lambda v: v[1]
901            oldest_at = min(map(get_timestamp,
902                                self.in_flight_requests.values()))
903            timeout = self.config['request_timeout_ms'] / 1000.0
904            if time.time() >= oldest_at + timeout:
905                return True
906        return False
907
908    def _handle_api_version_response(self, response):
909        error_type = Errors.for_code(response.error_code)
910        assert error_type is Errors.NoError, "API version check failed"
911        self._api_versions = dict([
912            (api_key, (min_version, max_version))
913            for api_key, min_version, max_version in response.api_versions
914        ])
915        return self._api_versions
916
917    def get_api_versions(self):
918        if self._api_versions is not None:
919            return self._api_versions
920
921        version = self.check_version()
922        if version < (0, 10, 0):
923            raise Errors.UnsupportedVersionError(
924                "ApiVersion not supported by cluster version {} < 0.10.0"
925                .format(version))
926        # _api_versions is set as a side effect of check_versions() on a cluster
927        # that supports 0.10.0 or later
928        return self._api_versions
929
930    def _infer_broker_version_from_api_versions(self, api_versions):
931        # The logic here is to check the list of supported request versions
932        # in reverse order. As soon as we find one that works, return it
933        test_cases = [
934            # format (<broker version>, <needed struct>)
935            ((1, 0, 0), MetadataRequest[5]),
936            ((0, 11, 0), MetadataRequest[4]),
937            ((0, 10, 2), OffsetFetchRequest[2]),
938            ((0, 10, 1), MetadataRequest[2]),
939        ]
940
941        # Get the best match of test cases
942        for broker_version, struct in sorted(test_cases, reverse=True):
943            if struct.API_KEY not in api_versions:
944                continue
945            min_version, max_version = api_versions[struct.API_KEY]
946            if min_version <= struct.API_VERSION <= max_version:
947                return broker_version
948
949        # We know that ApiVersionResponse is only supported in 0.10+
950        # so if all else fails, choose that
951        return (0, 10, 0)
952
953    def check_version(self, timeout=2, strict=False, topics=[]):
954        """Attempt to guess the broker version.
955
956        Note: This is a blocking call.
957
958        Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
959        """
960        timeout_at = time.time() + timeout
961        log.info('Probing node %s broker version', self.node_id)
962        # Monkeypatch some connection configurations to avoid timeouts
963        override_config = {
964            'request_timeout_ms': timeout * 1000,
965            'max_in_flight_requests_per_connection': 5
966        }
967        stashed = {}
968        for key in override_config:
969            stashed[key] = self.config[key]
970            self.config[key] = override_config[key]
971
972        # kafka kills the connection when it doesn't recognize an API request
973        # so we can send a test request and then follow immediately with a
974        # vanilla MetadataRequest. If the server did not recognize the first
975        # request, both will be failed with a ConnectionError that wraps
976        # socket.error (32, 54, or 104)
977        from kafka.protocol.admin import ApiVersionRequest, ListGroupsRequest
978        from kafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
979
980        test_cases = [
981            # All cases starting from 0.10 will be based on ApiVersionResponse
982            ((0, 10), ApiVersionRequest[0]()),
983            ((0, 9), ListGroupsRequest[0]()),
984            ((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')),
985            ((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])),
986            ((0, 8, 0), MetadataRequest[0](topics)),
987        ]
988
989        for version, request in test_cases:
990            if not self.connect_blocking(timeout_at - time.time()):
991                raise Errors.NodeNotReadyError()
992            f = self.send(request)
993            # HACK: sleeping to wait for socket to send bytes
994            time.sleep(0.1)
995            # when broker receives an unrecognized request API
996            # it abruptly closes our socket.
997            # so we attempt to send a second request immediately
998            # that we believe it will definitely recognize (metadata)
999            # the attempt to write to a disconnected socket should
1000            # immediately fail and allow us to infer that the prior
1001            # request was unrecognized
1002            mr = self.send(MetadataRequest[0](topics))
1003
1004            selector = self.config['selector']()
1005            selector.register(self._sock, selectors.EVENT_READ)
1006            while not (f.is_done and mr.is_done):
1007                selector.select(1)
1008                for response, future in self.recv():
1009                    future.success(response)
1010            selector.close()
1011
1012            if f.succeeded():
1013                if isinstance(request, ApiVersionRequest[0]):
1014                    # Starting from 0.10 kafka broker we determine version
1015                    # by looking at ApiVersionResponse
1016                    api_versions = self._handle_api_version_response(f.value)
1017                    version = self._infer_broker_version_from_api_versions(api_versions)
1018                log.info('Broker version identifed as %s', '.'.join(map(str, version)))
1019                log.info('Set configuration api_version=%s to skip auto'
1020                         ' check_version requests on startup', version)
1021                break
1022
1023            # Only enable strict checking to verify that we understand failure
1024            # modes. For most users, the fact that the request failed should be
1025            # enough to rule out a particular broker version.
1026            if strict:
1027                # If the socket flush hack did not work (which should force the
1028                # connection to close and fail all pending requests), then we
1029                # get a basic Request Timeout. This is not ideal, but we'll deal
1030                if isinstance(f.exception, Errors.RequestTimedOutError):
1031                    pass
1032
1033                # 0.9 brokers do not close the socket on unrecognized api
1034                # requests (bug...). In this case we expect to see a correlation
1035                # id mismatch
1036                elif (isinstance(f.exception, Errors.CorrelationIdError) and
1037                      version == (0, 10)):
1038                    pass
1039                elif six.PY2:
1040                    assert isinstance(f.exception.args[0], socket.error)
1041                    assert f.exception.args[0].errno in (32, 54, 104)
1042                else:
1043                    assert isinstance(f.exception.args[0], ConnectionError)
1044            log.info("Broker is not v%s -- it did not recognize %s",
1045                     version, request.__class__.__name__)
1046        else:
1047            raise Errors.UnrecognizedBrokerVersion()
1048
1049        for key in stashed:
1050            self.config[key] = stashed[key]
1051        return version
1052
1053    def __str__(self):
1054        return "<BrokerConnection node_id=%s host=%s:%d %s [%s %s]>" % (
1055            self.node_id, self.host, self.port, self.state,
1056            AFI_NAMES[self._sock_afi], self._sock_addr)
1057
1058
1059class BrokerConnectionMetrics(object):
1060    def __init__(self, metrics, metric_group_prefix, node_id):
1061        self.metrics = metrics
1062
1063        # Any broker may have registered summary metrics already
1064        # but if not, we need to create them so we can set as parents below
1065        all_conns_transferred = metrics.get_sensor('bytes-sent-received')
1066        if not all_conns_transferred:
1067            metric_group_name = metric_group_prefix + '-metrics'
1068
1069            bytes_transferred = metrics.sensor('bytes-sent-received')
1070            bytes_transferred.add(metrics.metric_name(
1071                'network-io-rate', metric_group_name,
1072                'The average number of network operations (reads or writes) on all'
1073                ' connections per second.'), Rate(sampled_stat=Count()))
1074
1075            bytes_sent = metrics.sensor('bytes-sent',
1076                                        parents=[bytes_transferred])
1077            bytes_sent.add(metrics.metric_name(
1078                'outgoing-byte-rate', metric_group_name,
1079                'The average number of outgoing bytes sent per second to all'
1080                ' servers.'), Rate())
1081            bytes_sent.add(metrics.metric_name(
1082                'request-rate', metric_group_name,
1083                'The average number of requests sent per second.'),
1084                Rate(sampled_stat=Count()))
1085            bytes_sent.add(metrics.metric_name(
1086                'request-size-avg', metric_group_name,
1087                'The average size of all requests in the window.'), Avg())
1088            bytes_sent.add(metrics.metric_name(
1089                'request-size-max', metric_group_name,
1090                'The maximum size of any request sent in the window.'), Max())
1091
1092            bytes_received = metrics.sensor('bytes-received',
1093                                            parents=[bytes_transferred])
1094            bytes_received.add(metrics.metric_name(
1095                'incoming-byte-rate', metric_group_name,
1096                'Bytes/second read off all sockets'), Rate())
1097            bytes_received.add(metrics.metric_name(
1098                'response-rate', metric_group_name,
1099                'Responses received sent per second.'),
1100                Rate(sampled_stat=Count()))
1101
1102            request_latency = metrics.sensor('request-latency')
1103            request_latency.add(metrics.metric_name(
1104                'request-latency-avg', metric_group_name,
1105                'The average request latency in ms.'),
1106                Avg())
1107            request_latency.add(metrics.metric_name(
1108                'request-latency-max', metric_group_name,
1109                'The maximum request latency in ms.'),
1110                Max())
1111
1112        # if one sensor of the metrics has been registered for the connection,
1113        # then all other sensors should have been registered; and vice versa
1114        node_str = 'node-{0}'.format(node_id)
1115        node_sensor = metrics.get_sensor(node_str + '.bytes-sent')
1116        if not node_sensor:
1117            metric_group_name = metric_group_prefix + '-node-metrics.' + node_str
1118
1119            bytes_sent = metrics.sensor(
1120                node_str + '.bytes-sent',
1121                parents=[metrics.get_sensor('bytes-sent')])
1122            bytes_sent.add(metrics.metric_name(
1123                'outgoing-byte-rate', metric_group_name,
1124                'The average number of outgoing bytes sent per second.'),
1125                Rate())
1126            bytes_sent.add(metrics.metric_name(
1127                'request-rate', metric_group_name,
1128                'The average number of requests sent per second.'),
1129                Rate(sampled_stat=Count()))
1130            bytes_sent.add(metrics.metric_name(
1131                'request-size-avg', metric_group_name,
1132                'The average size of all requests in the window.'),
1133                Avg())
1134            bytes_sent.add(metrics.metric_name(
1135                'request-size-max', metric_group_name,
1136                'The maximum size of any request sent in the window.'),
1137                Max())
1138
1139            bytes_received = metrics.sensor(
1140                node_str + '.bytes-received',
1141                parents=[metrics.get_sensor('bytes-received')])
1142            bytes_received.add(metrics.metric_name(
1143                'incoming-byte-rate', metric_group_name,
1144                'Bytes/second read off node-connection socket'),
1145                Rate())
1146            bytes_received.add(metrics.metric_name(
1147                'response-rate', metric_group_name,
1148                'The average number of responses received per second.'),
1149                Rate(sampled_stat=Count()))
1150
1151            request_time = metrics.sensor(
1152                node_str + '.latency',
1153                parents=[metrics.get_sensor('request-latency')])
1154            request_time.add(metrics.metric_name(
1155                'request-latency-avg', metric_group_name,
1156                'The average request latency in ms.'),
1157                Avg())
1158            request_time.add(metrics.metric_name(
1159                'request-latency-max', metric_group_name,
1160                'The maximum request latency in ms.'),
1161                Max())
1162
1163        self.bytes_sent = metrics.sensor(node_str + '.bytes-sent')
1164        self.bytes_received = metrics.sensor(node_str + '.bytes-received')
1165        self.request_time = metrics.sensor(node_str + '.latency')
1166
1167
1168def _address_family(address):
1169    """
1170        Attempt to determine the family of an address (or hostname)
1171
1172        :return: either socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC if the address family
1173                 could not be determined
1174    """
1175    if address.startswith('[') and address.endswith(']'):
1176        return socket.AF_INET6
1177    for af in (socket.AF_INET, socket.AF_INET6):
1178        try:
1179            socket.inet_pton(af, address)
1180            return af
1181        except (ValueError, AttributeError, socket.error):
1182            continue
1183    return socket.AF_UNSPEC
1184
1185
1186def get_ip_port_afi(host_and_port_str):
1187    """
1188        Parse the IP and port from a string in the format of:
1189
1190            * host_or_ip          <- Can be either IPv4 address literal or hostname/fqdn
1191            * host_or_ipv4:port   <- Can be either IPv4 address literal or hostname/fqdn
1192            * [host_or_ip]        <- IPv6 address literal
1193            * [host_or_ip]:port.  <- IPv6 address literal
1194
1195        .. note:: IPv6 address literals with ports *must* be enclosed in brackets
1196
1197        .. note:: If the port is not specified, default will be returned.
1198
1199        :return: tuple (host, port, afi), afi will be socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC
1200    """
1201    host_and_port_str = host_and_port_str.strip()
1202    if host_and_port_str.startswith('['):
1203        af = socket.AF_INET6
1204        host, rest = host_and_port_str[1:].split(']')
1205        if rest:
1206            port = int(rest[1:])
1207        else:
1208            port = DEFAULT_KAFKA_PORT
1209        return host, port, af
1210    else:
1211        if ':' not in host_and_port_str:
1212            af = _address_family(host_and_port_str)
1213            return host_and_port_str, DEFAULT_KAFKA_PORT, af
1214        else:
1215            # now we have something with a colon in it and no square brackets. It could be
1216            # either an IPv6 address literal (e.g., "::1") or an IP:port pair or a host:port pair
1217            try:
1218                # if it decodes as an IPv6 address, use that
1219                socket.inet_pton(socket.AF_INET6, host_and_port_str)
1220                return host_and_port_str, DEFAULT_KAFKA_PORT, socket.AF_INET6
1221            except AttributeError:
1222                log.warning('socket.inet_pton not available on this platform.'
1223                            ' consider `pip install win_inet_pton`')
1224                pass
1225            except (ValueError, socket.error):
1226                # it's a host:port pair
1227                pass
1228            host, port = host_and_port_str.rsplit(':', 1)
1229            port = int(port)
1230
1231            af = _address_family(host)
1232            return host, port, af
1233
1234
1235def collect_hosts(hosts, randomize=True):
1236    """
1237    Collects a comma-separated set of hosts (host:port) and optionally
1238    randomize the returned list.
1239    """
1240
1241    if isinstance(hosts, six.string_types):
1242        hosts = hosts.strip().split(',')
1243
1244    result = []
1245    afi = socket.AF_INET
1246    for host_port in hosts:
1247
1248        host, port, afi = get_ip_port_afi(host_port)
1249
1250        if port < 0:
1251            port = DEFAULT_KAFKA_PORT
1252
1253        result.append((host, port, afi))
1254
1255    if randomize:
1256        shuffle(result)
1257
1258    return result
1259
1260
1261def is_inet_4_or_6(gai):
1262    """Given a getaddrinfo struct, return True iff ipv4 or ipv6"""
1263    return gai[0] in (socket.AF_INET, socket.AF_INET6)
1264
1265
1266def dns_lookup(host, port, afi=socket.AF_UNSPEC):
1267    """Returns a list of getaddrinfo structs, optionally filtered to an afi (ipv4 / ipv6)"""
1268    # XXX: all DNS functions in Python are blocking. If we really
1269    # want to be non-blocking here, we need to use a 3rd-party
1270    # library like python-adns, or move resolution onto its
1271    # own thread. This will be subject to the default libc
1272    # name resolution timeout (5s on most Linux boxes)
1273    try:
1274        return list(filter(is_inet_4_or_6,
1275                           socket.getaddrinfo(host, port, afi,
1276                                              socket.SOCK_STREAM)))
1277    except socket.gaierror as ex:
1278        log.warning('DNS lookup failed for %s:%d,'
1279                    ' exception was %s. Is your'
1280                    ' advertised.listeners (called'
1281                    ' advertised.host.name before Kafka 9)'
1282                    ' correct and resolvable?',
1283                    host, port, ex)
1284        return []
1285