1from __future__ import absolute_import
2
3import collections
4import logging
5
6import kafka.errors as Errors
7from kafka.protocol.api import RequestHeader
8from kafka.protocol.commit import GroupCoordinatorResponse
9from kafka.protocol.frame import KafkaBytes
10from kafka.protocol.types import Int32
11from kafka.version import __version__
12
13log = logging.getLogger(__name__)
14
15
16class KafkaProtocol(object):
17    """Manage the kafka network protocol
18
19    Use an instance of KafkaProtocol to manage bytes send/recv'd
20    from a network socket to a broker.
21    """
22    def __init__(self, client_id=None, api_version=None):
23        if client_id is None:
24            client_id = self._gen_client_id()
25        self._client_id = client_id
26        self._api_version = api_version
27        self._correlation_id = 0
28        self._header = KafkaBytes(4)
29        self._rbuffer = None
30        self._receiving = False
31        self.in_flight_requests = collections.deque()
32        self.bytes_to_send = []
33
34    def _next_correlation_id(self):
35        self._correlation_id = (self._correlation_id + 1) % 2**31
36        return self._correlation_id
37
38    def _gen_client_id(self):
39        return 'kafka-python' + __version__
40
41    def send_request(self, request, correlation_id=None):
42        """Encode and queue a kafka api request for sending.
43
44        Arguments:
45            request (object): An un-encoded kafka request.
46            correlation_id (int, optional): Optionally specify an ID to
47                correlate requests with responses. If not provided, an ID will
48                be generated automatically.
49
50        Returns:
51            correlation_id
52        """
53        log.debug('Sending request %s', request)
54        if correlation_id is None:
55            correlation_id = self._next_correlation_id()
56        header = RequestHeader(request,
57                               correlation_id=correlation_id,
58                               client_id=self._client_id)
59        message = b''.join([header.encode(), request.encode()])
60        size = Int32.encode(len(message))
61        data = size + message
62        self.bytes_to_send.append(data)
63        if request.expect_response():
64            ifr = (correlation_id, request)
65            self.in_flight_requests.append(ifr)
66        return correlation_id
67
68    def send_bytes(self):
69        """Retrieve all pending bytes to send on the network"""
70        data = b''.join(self.bytes_to_send)
71        self.bytes_to_send = []
72        return data
73
74    def receive_bytes(self, data):
75        """Process bytes received from the network.
76
77        Arguments:
78            data (bytes): any length bytes received from a network connection
79                to a kafka broker.
80
81        Returns:
82            responses (list of (correlation_id, response)): any/all completed
83                responses, decoded from bytes to python objects.
84
85        Raises:
86             KafkaProtocolError: if the bytes received could not be decoded.
87             CorrelationIdError: if the response does not match the request
88                 correlation id.
89        """
90        i = 0
91        n = len(data)
92        responses = []
93        while i < n:
94
95            # Not receiving is the state of reading the payload header
96            if not self._receiving:
97                bytes_to_read = min(4 - self._header.tell(), n - i)
98                self._header.write(data[i:i+bytes_to_read])
99                i += bytes_to_read
100
101                if self._header.tell() == 4:
102                    self._header.seek(0)
103                    nbytes = Int32.decode(self._header)
104                    # reset buffer and switch state to receiving payload bytes
105                    self._rbuffer = KafkaBytes(nbytes)
106                    self._receiving = True
107                elif self._header.tell() > 4:
108                    raise Errors.KafkaError('this should not happen - are you threading?')
109
110            if self._receiving:
111                total_bytes = len(self._rbuffer)
112                staged_bytes = self._rbuffer.tell()
113                bytes_to_read = min(total_bytes - staged_bytes, n - i)
114                self._rbuffer.write(data[i:i+bytes_to_read])
115                i += bytes_to_read
116
117                staged_bytes = self._rbuffer.tell()
118                if staged_bytes > total_bytes:
119                    raise Errors.KafkaError('Receive buffer has more bytes than expected?')
120
121                if staged_bytes != total_bytes:
122                    break
123
124                self._receiving = False
125                self._rbuffer.seek(0)
126                resp = self._process_response(self._rbuffer)
127                responses.append(resp)
128                self._reset_buffer()
129        return responses
130
131    def _process_response(self, read_buffer):
132        recv_correlation_id = Int32.decode(read_buffer)
133        log.debug('Received correlation id: %d', recv_correlation_id)
134
135        if not self.in_flight_requests:
136            raise Errors.CorrelationIdError(
137                'No in-flight-request found for server response'
138                ' with correlation ID %d'
139                % (recv_correlation_id,))
140
141        (correlation_id, request) = self.in_flight_requests.popleft()
142
143        # 0.8.2 quirk
144        if (self._api_version == (0, 8, 2) and
145            request.RESPONSE_TYPE is GroupCoordinatorResponse[0] and
146            correlation_id != 0 and
147            recv_correlation_id == 0):
148            log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse'
149                        ' Correlation ID does not match request. This'
150                        ' should go away once at least one topic has been'
151                        ' initialized on the broker.')
152
153        elif correlation_id != recv_correlation_id:
154            # return or raise?
155            raise Errors.CorrelationIdError(
156                'Correlation IDs do not match: sent %d, recv %d'
157                % (correlation_id, recv_correlation_id))
158
159        # decode response
160        log.debug('Processing response %s', request.RESPONSE_TYPE.__name__)
161        try:
162            response = request.RESPONSE_TYPE.decode(read_buffer)
163        except ValueError:
164            read_buffer.seek(0)
165            buf = read_buffer.read()
166            log.error('Response %d [ResponseType: %s Request: %s]:'
167                      ' Unable to decode %d-byte buffer: %r',
168                      correlation_id, request.RESPONSE_TYPE,
169                      request, len(buf), buf)
170            raise Errors.KafkaProtocolError('Unable to decode response')
171
172        return (correlation_id, response)
173
174    def _reset_buffer(self):
175        self._receiving = False
176        self._header.seek(0)
177        self._rbuffer = None
178