1from __future__ import absolute_import 2 3import collections 4import logging 5 6import kafka.errors as Errors 7from kafka.protocol.api import RequestHeader 8from kafka.protocol.commit import GroupCoordinatorResponse 9from kafka.protocol.frame import KafkaBytes 10from kafka.protocol.types import Int32 11from kafka.version import __version__ 12 13log = logging.getLogger(__name__) 14 15 16class KafkaProtocol(object): 17 """Manage the kafka network protocol 18 19 Use an instance of KafkaProtocol to manage bytes send/recv'd 20 from a network socket to a broker. 21 """ 22 def __init__(self, client_id=None, api_version=None): 23 if client_id is None: 24 client_id = self._gen_client_id() 25 self._client_id = client_id 26 self._api_version = api_version 27 self._correlation_id = 0 28 self._header = KafkaBytes(4) 29 self._rbuffer = None 30 self._receiving = False 31 self.in_flight_requests = collections.deque() 32 self.bytes_to_send = [] 33 34 def _next_correlation_id(self): 35 self._correlation_id = (self._correlation_id + 1) % 2**31 36 return self._correlation_id 37 38 def _gen_client_id(self): 39 return 'kafka-python' + __version__ 40 41 def send_request(self, request, correlation_id=None): 42 """Encode and queue a kafka api request for sending. 43 44 Arguments: 45 request (object): An un-encoded kafka request. 46 correlation_id (int, optional): Optionally specify an ID to 47 correlate requests with responses. If not provided, an ID will 48 be generated automatically. 49 50 Returns: 51 correlation_id 52 """ 53 log.debug('Sending request %s', request) 54 if correlation_id is None: 55 correlation_id = self._next_correlation_id() 56 header = RequestHeader(request, 57 correlation_id=correlation_id, 58 client_id=self._client_id) 59 message = b''.join([header.encode(), request.encode()]) 60 size = Int32.encode(len(message)) 61 data = size + message 62 self.bytes_to_send.append(data) 63 if request.expect_response(): 64 ifr = (correlation_id, request) 65 self.in_flight_requests.append(ifr) 66 return correlation_id 67 68 def send_bytes(self): 69 """Retrieve all pending bytes to send on the network""" 70 data = b''.join(self.bytes_to_send) 71 self.bytes_to_send = [] 72 return data 73 74 def receive_bytes(self, data): 75 """Process bytes received from the network. 76 77 Arguments: 78 data (bytes): any length bytes received from a network connection 79 to a kafka broker. 80 81 Returns: 82 responses (list of (correlation_id, response)): any/all completed 83 responses, decoded from bytes to python objects. 84 85 Raises: 86 KafkaProtocolError: if the bytes received could not be decoded. 87 CorrelationIdError: if the response does not match the request 88 correlation id. 89 """ 90 i = 0 91 n = len(data) 92 responses = [] 93 while i < n: 94 95 # Not receiving is the state of reading the payload header 96 if not self._receiving: 97 bytes_to_read = min(4 - self._header.tell(), n - i) 98 self._header.write(data[i:i+bytes_to_read]) 99 i += bytes_to_read 100 101 if self._header.tell() == 4: 102 self._header.seek(0) 103 nbytes = Int32.decode(self._header) 104 # reset buffer and switch state to receiving payload bytes 105 self._rbuffer = KafkaBytes(nbytes) 106 self._receiving = True 107 elif self._header.tell() > 4: 108 raise Errors.KafkaError('this should not happen - are you threading?') 109 110 if self._receiving: 111 total_bytes = len(self._rbuffer) 112 staged_bytes = self._rbuffer.tell() 113 bytes_to_read = min(total_bytes - staged_bytes, n - i) 114 self._rbuffer.write(data[i:i+bytes_to_read]) 115 i += bytes_to_read 116 117 staged_bytes = self._rbuffer.tell() 118 if staged_bytes > total_bytes: 119 raise Errors.KafkaError('Receive buffer has more bytes than expected?') 120 121 if staged_bytes != total_bytes: 122 break 123 124 self._receiving = False 125 self._rbuffer.seek(0) 126 resp = self._process_response(self._rbuffer) 127 responses.append(resp) 128 self._reset_buffer() 129 return responses 130 131 def _process_response(self, read_buffer): 132 recv_correlation_id = Int32.decode(read_buffer) 133 log.debug('Received correlation id: %d', recv_correlation_id) 134 135 if not self.in_flight_requests: 136 raise Errors.CorrelationIdError( 137 'No in-flight-request found for server response' 138 ' with correlation ID %d' 139 % (recv_correlation_id,)) 140 141 (correlation_id, request) = self.in_flight_requests.popleft() 142 143 # 0.8.2 quirk 144 if (self._api_version == (0, 8, 2) and 145 request.RESPONSE_TYPE is GroupCoordinatorResponse[0] and 146 correlation_id != 0 and 147 recv_correlation_id == 0): 148 log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse' 149 ' Correlation ID does not match request. This' 150 ' should go away once at least one topic has been' 151 ' initialized on the broker.') 152 153 elif correlation_id != recv_correlation_id: 154 # return or raise? 155 raise Errors.CorrelationIdError( 156 'Correlation IDs do not match: sent %d, recv %d' 157 % (correlation_id, recv_correlation_id)) 158 159 # decode response 160 log.debug('Processing response %s', request.RESPONSE_TYPE.__name__) 161 try: 162 response = request.RESPONSE_TYPE.decode(read_buffer) 163 except ValueError: 164 read_buffer.seek(0) 165 buf = read_buffer.read() 166 log.error('Response %d [ResponseType: %s Request: %s]:' 167 ' Unable to decode %d-byte buffer: %r', 168 correlation_id, request.RESPONSE_TYPE, 169 request, len(buf), buf) 170 raise Errors.KafkaProtocolError('Unable to decode response') 171 172 return (correlation_id, response) 173 174 def _reset_buffer(self): 175 self._receiving = False 176 self._header.seek(0) 177 self._rbuffer = None 178