1from __future__ import absolute_import
2
3import inspect
4import sys
5
6
7class KafkaError(RuntimeError):
8    retriable = False
9    # whether metadata should be refreshed on error
10    invalid_metadata = False
11
12    def __str__(self):
13        if not self.args:
14            return self.__class__.__name__
15        return '{0}: {1}'.format(self.__class__.__name__,
16                               super(KafkaError, self).__str__())
17
18
19class IllegalStateError(KafkaError):
20    pass
21
22
23class IllegalArgumentError(KafkaError):
24    pass
25
26
27class NoBrokersAvailable(KafkaError):
28    retriable = True
29    invalid_metadata = True
30
31
32class NodeNotReadyError(KafkaError):
33    retriable = True
34
35
36class KafkaProtocolError(KafkaError):
37    retriable = True
38
39
40class CorrelationIdError(KafkaProtocolError):
41    retriable = True
42
43
44class Cancelled(KafkaError):
45    retriable = True
46
47
48class TooManyInFlightRequests(KafkaError):
49    retriable = True
50
51
52class StaleMetadata(KafkaError):
53    retriable = True
54    invalid_metadata = True
55
56
57class MetadataEmptyBrokerList(KafkaError):
58    retriable = True
59
60
61class UnrecognizedBrokerVersion(KafkaError):
62    pass
63
64
65class IncompatibleBrokerVersion(KafkaError):
66    pass
67
68
69class CommitFailedError(KafkaError):
70    def __init__(self, *args, **kwargs):
71        super(CommitFailedError, self).__init__(
72            """Commit cannot be completed since the group has already
73            rebalanced and assigned the partitions to another member.
74            This means that the time between subsequent calls to poll()
75            was longer than the configured max_poll_interval_ms, which
76            typically implies that the poll loop is spending too much
77            time message processing. You can address this either by
78            increasing the rebalance timeout with max_poll_interval_ms,
79            or by reducing the maximum size of batches returned in poll()
80            with max_poll_records.
81            """, *args, **kwargs)
82
83
84class AuthenticationMethodNotSupported(KafkaError):
85    pass
86
87
88class AuthenticationFailedError(KafkaError):
89    retriable = False
90
91
92class BrokerResponseError(KafkaError):
93    errno = None
94    message = None
95    description = None
96
97    def __str__(self):
98        """Add errno to standard KafkaError str"""
99        return '[Error {0}] {1}'.format(
100            self.errno,
101            super(BrokerResponseError, self).__str__())
102
103
104class NoError(BrokerResponseError):
105    errno = 0
106    message = 'NO_ERROR'
107    description = 'No error--it worked!'
108
109
110class UnknownError(BrokerResponseError):
111    errno = -1
112    message = 'UNKNOWN'
113    description = 'An unexpected server error.'
114
115
116class OffsetOutOfRangeError(BrokerResponseError):
117    errno = 1
118    message = 'OFFSET_OUT_OF_RANGE'
119    description = ('The requested offset is outside the range of offsets'
120                   ' maintained by the server for the given topic/partition.')
121
122
123class CorruptRecordException(BrokerResponseError):
124    errno = 2
125    message = 'CORRUPT_MESSAGE'
126    description = ('This message has failed its CRC checksum, exceeds the'
127                   ' valid size, or is otherwise corrupt.')
128
129# Backward compatibility
130InvalidMessageError = CorruptRecordException
131
132
133class UnknownTopicOrPartitionError(BrokerResponseError):
134    errno = 3
135    message = 'UNKNOWN_TOPIC_OR_PARTITION'
136    description = ('This request is for a topic or partition that does not'
137                   ' exist on this broker.')
138    retriable = True
139    invalid_metadata = True
140
141
142class InvalidFetchRequestError(BrokerResponseError):
143    errno = 4
144    message = 'INVALID_FETCH_SIZE'
145    description = 'The message has a negative size.'
146
147
148class LeaderNotAvailableError(BrokerResponseError):
149    errno = 5
150    message = 'LEADER_NOT_AVAILABLE'
151    description = ('This error is thrown if we are in the middle of a'
152                   ' leadership election and there is currently no leader for'
153                   ' this partition and hence it is unavailable for writes.')
154    retriable = True
155    invalid_metadata = True
156
157
158class NotLeaderForPartitionError(BrokerResponseError):
159    errno = 6
160    message = 'NOT_LEADER_FOR_PARTITION'
161    description = ('This error is thrown if the client attempts to send'
162                   ' messages to a replica that is not the leader for some'
163                   ' partition. It indicates that the clients metadata is out'
164                   ' of date.')
165    retriable = True
166    invalid_metadata = True
167
168
169class RequestTimedOutError(BrokerResponseError):
170    errno = 7
171    message = 'REQUEST_TIMED_OUT'
172    description = ('This error is thrown if the request exceeds the'
173                   ' user-specified time limit in the request.')
174    retriable = True
175
176
177class BrokerNotAvailableError(BrokerResponseError):
178    errno = 8
179    message = 'BROKER_NOT_AVAILABLE'
180    description = ('This is not a client facing error and is used mostly by'
181                   ' tools when a broker is not alive.')
182
183
184class ReplicaNotAvailableError(BrokerResponseError):
185    errno = 9
186    message = 'REPLICA_NOT_AVAILABLE'
187    description = ('If replica is expected on a broker, but is not (this can be'
188                   ' safely ignored).')
189
190
191class MessageSizeTooLargeError(BrokerResponseError):
192    errno = 10
193    message = 'MESSAGE_SIZE_TOO_LARGE'
194    description = ('The server has a configurable maximum message size to avoid'
195                   ' unbounded memory allocation. This error is thrown if the'
196                   ' client attempt to produce a message larger than this'
197                   ' maximum.')
198
199
200class StaleControllerEpochError(BrokerResponseError):
201    errno = 11
202    message = 'STALE_CONTROLLER_EPOCH'
203    description = 'Internal error code for broker-to-broker communication.'
204
205
206class OffsetMetadataTooLargeError(BrokerResponseError):
207    errno = 12
208    message = 'OFFSET_METADATA_TOO_LARGE'
209    description = ('If you specify a string larger than configured maximum for'
210                   ' offset metadata.')
211
212
213# TODO is this deprecated? https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
214class StaleLeaderEpochCodeError(BrokerResponseError):
215    errno = 13
216    message = 'STALE_LEADER_EPOCH_CODE'
217
218
219class GroupLoadInProgressError(BrokerResponseError):
220    errno = 14
221    message = 'OFFSETS_LOAD_IN_PROGRESS'
222    description = ('The broker returns this error code for an offset fetch'
223                   ' request if it is still loading offsets (after a leader'
224                   ' change for that offsets topic partition), or in response'
225                   ' to group membership requests (such as heartbeats) when'
226                   ' group metadata is being loaded by the coordinator.')
227    retriable = True
228
229
230class GroupCoordinatorNotAvailableError(BrokerResponseError):
231    errno = 15
232    message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE'
233    description = ('The broker returns this error code for group coordinator'
234                   ' requests, offset commits, and most group management'
235                   ' requests if the offsets topic has not yet been created, or'
236                   ' if the group coordinator is not active.')
237    retriable = True
238
239
240class NotCoordinatorForGroupError(BrokerResponseError):
241    errno = 16
242    message = 'NOT_COORDINATOR_FOR_CONSUMER'
243    description = ('The broker returns this error code if it receives an offset'
244                   ' fetch or commit request for a group that it is not a'
245                   ' coordinator for.')
246    retriable = True
247
248
249class InvalidTopicError(BrokerResponseError):
250    errno = 17
251    message = 'INVALID_TOPIC'
252    description = ('For a request which attempts to access an invalid topic'
253                   ' (e.g. one which has an illegal name), or if an attempt'
254                   ' is made to write to an internal topic (such as the'
255                   ' consumer offsets topic).')
256
257
258class RecordListTooLargeError(BrokerResponseError):
259    errno = 18
260    message = 'RECORD_LIST_TOO_LARGE'
261    description = ('If a message batch in a produce request exceeds the maximum'
262                   ' configured segment size.')
263
264
265class NotEnoughReplicasError(BrokerResponseError):
266    errno = 19
267    message = 'NOT_ENOUGH_REPLICAS'
268    description = ('Returned from a produce request when the number of in-sync'
269                   ' replicas is lower than the configured minimum and'
270                   ' requiredAcks is -1.')
271    retriable = True
272
273
274class NotEnoughReplicasAfterAppendError(BrokerResponseError):
275    errno = 20
276    message = 'NOT_ENOUGH_REPLICAS_AFTER_APPEND'
277    description = ('Returned from a produce request when the message was'
278                   ' written to the log, but with fewer in-sync replicas than'
279                   ' required.')
280    retriable = True
281
282
283class InvalidRequiredAcksError(BrokerResponseError):
284    errno = 21
285    message = 'INVALID_REQUIRED_ACKS'
286    description = ('Returned from a produce request if the requested'
287                   ' requiredAcks is invalid (anything other than -1, 1, or 0).')
288
289
290class IllegalGenerationError(BrokerResponseError):
291    errno = 22
292    message = 'ILLEGAL_GENERATION'
293    description = ('Returned from group membership requests (such as heartbeats)'
294                   ' when the generation id provided in the request is not the'
295                   ' current generation.')
296
297
298class InconsistentGroupProtocolError(BrokerResponseError):
299    errno = 23
300    message = 'INCONSISTENT_GROUP_PROTOCOL'
301    description = ('Returned in join group when the member provides a protocol'
302                   ' type or set of protocols which is not compatible with the'
303                   ' current group.')
304
305
306class InvalidGroupIdError(BrokerResponseError):
307    errno = 24
308    message = 'INVALID_GROUP_ID'
309    description = 'Returned in join group when the groupId is empty or null.'
310
311
312class UnknownMemberIdError(BrokerResponseError):
313    errno = 25
314    message = 'UNKNOWN_MEMBER_ID'
315    description = ('Returned from group requests (offset commits/fetches,'
316                   ' heartbeats, etc) when the memberId is not in the current'
317                   ' generation.')
318
319
320class InvalidSessionTimeoutError(BrokerResponseError):
321    errno = 26
322    message = 'INVALID_SESSION_TIMEOUT'
323    description = ('Return in join group when the requested session timeout is'
324                   ' outside of the allowed range on the broker')
325
326
327class RebalanceInProgressError(BrokerResponseError):
328    errno = 27
329    message = 'REBALANCE_IN_PROGRESS'
330    description = ('Returned in heartbeat requests when the coordinator has'
331                   ' begun rebalancing the group. This indicates to the client'
332                   ' that it should rejoin the group.')
333
334
335class InvalidCommitOffsetSizeError(BrokerResponseError):
336    errno = 28
337    message = 'INVALID_COMMIT_OFFSET_SIZE'
338    description = ('This error indicates that an offset commit was rejected'
339                   ' because of oversize metadata.')
340
341
342class TopicAuthorizationFailedError(BrokerResponseError):
343    errno = 29
344    message = 'TOPIC_AUTHORIZATION_FAILED'
345    description = ('Returned by the broker when the client is not authorized to'
346                   ' access the requested topic.')
347
348
349class GroupAuthorizationFailedError(BrokerResponseError):
350    errno = 30
351    message = 'GROUP_AUTHORIZATION_FAILED'
352    description = ('Returned by the broker when the client is not authorized to'
353                   ' access a particular groupId.')
354
355
356class ClusterAuthorizationFailedError(BrokerResponseError):
357    errno = 31
358    message = 'CLUSTER_AUTHORIZATION_FAILED'
359    description = ('Returned by the broker when the client is not authorized to'
360                   ' use an inter-broker or administrative API.')
361
362
363class InvalidTimestampError(BrokerResponseError):
364    errno = 32
365    message = 'INVALID_TIMESTAMP'
366    description = 'The timestamp of the message is out of acceptable range.'
367
368
369class UnsupportedSaslMechanismError(BrokerResponseError):
370    errno = 33
371    message = 'UNSUPPORTED_SASL_MECHANISM'
372    description = 'The broker does not support the requested SASL mechanism.'
373
374
375class IllegalSaslStateError(BrokerResponseError):
376    errno = 34
377    message = 'ILLEGAL_SASL_STATE'
378    description = 'Request is not valid given the current SASL state.'
379
380
381class UnsupportedVersionError(BrokerResponseError):
382    errno = 35
383    message = 'UNSUPPORTED_VERSION'
384    description = 'The version of API is not supported.'
385
386
387class TopicAlreadyExistsError(BrokerResponseError):
388    errno = 36
389    message = 'TOPIC_ALREADY_EXISTS'
390    description = 'Topic with this name already exists.'
391
392
393class InvalidPartitionsError(BrokerResponseError):
394    errno = 37
395    message = 'INVALID_PARTITIONS'
396    description = 'Number of partitions is invalid.'
397
398
399class InvalidReplicationFactorError(BrokerResponseError):
400    errno = 38
401    message = 'INVALID_REPLICATION_FACTOR'
402    description = 'Replication-factor is invalid.'
403
404
405class InvalidReplicationAssignmentError(BrokerResponseError):
406    errno = 39
407    message = 'INVALID_REPLICATION_ASSIGNMENT'
408    description = 'Replication assignment is invalid.'
409
410
411class InvalidConfigurationError(BrokerResponseError):
412    errno = 40
413    message = 'INVALID_CONFIG'
414    description = 'Configuration is invalid.'
415
416
417class NotControllerError(BrokerResponseError):
418    errno = 41
419    message = 'NOT_CONTROLLER'
420    description = 'This is not the correct controller for this cluster.'
421    retriable = True
422
423
424class InvalidRequestError(BrokerResponseError):
425    errno = 42
426    message = 'INVALID_REQUEST'
427    description = ('This most likely occurs because of a request being'
428                   ' malformed by the client library or the message was'
429                   ' sent to an incompatible broker. See the broker logs'
430                   ' for more details.')
431
432
433class UnsupportedForMessageFormatError(BrokerResponseError):
434    errno = 43
435    message = 'UNSUPPORTED_FOR_MESSAGE_FORMAT'
436    description = ('The message format version on the broker does not'
437                   ' support this request.')
438
439
440class PolicyViolationError(BrokerResponseError):
441    errno = 44
442    message = 'POLICY_VIOLATION'
443    description = 'Request parameters do not satisfy the configured policy.'
444
445
446class KafkaUnavailableError(KafkaError):
447    pass
448
449
450class KafkaTimeoutError(KafkaError):
451    pass
452
453
454class FailedPayloadsError(KafkaError):
455    def __init__(self, payload, *args):
456        super(FailedPayloadsError, self).__init__(*args)
457        self.payload = payload
458
459
460class KafkaConnectionError(KafkaError):
461    retriable = True
462    invalid_metadata = True
463
464
465class ConnectionError(KafkaConnectionError):
466    """Deprecated"""
467
468
469class BufferUnderflowError(KafkaError):
470    pass
471
472
473class ChecksumError(KafkaError):
474    pass
475
476
477class ConsumerFetchSizeTooSmall(KafkaError):
478    pass
479
480
481class ConsumerNoMoreData(KafkaError):
482    pass
483
484
485class ProtocolError(KafkaError):
486    pass
487
488
489class UnsupportedCodecError(KafkaError):
490    pass
491
492
493class KafkaConfigurationError(KafkaError):
494    pass
495
496
497class QuotaViolationError(KafkaError):
498    pass
499
500
501class AsyncProducerQueueFull(KafkaError):
502    def __init__(self, failed_msgs, *args):
503        super(AsyncProducerQueueFull, self).__init__(*args)
504        self.failed_msgs = failed_msgs
505
506
507def _iter_broker_errors():
508    for name, obj in inspect.getmembers(sys.modules[__name__]):
509        if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError:
510            yield obj
511
512
513kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()])
514
515
516def for_code(error_code):
517    return kafka_errors.get(error_code, UnknownError)
518
519
520def check_error(response):
521    if isinstance(response, Exception):
522        raise response
523    if response.error:
524        error_class = kafka_errors.get(response.error, UnknownError)
525        raise error_class(response)
526
527
528RETRY_BACKOFF_ERROR_TYPES = (
529    KafkaUnavailableError, LeaderNotAvailableError,
530    KafkaConnectionError, FailedPayloadsError
531)
532
533
534RETRY_REFRESH_ERROR_TYPES = (
535    NotLeaderForPartitionError, UnknownTopicOrPartitionError,
536    LeaderNotAvailableError, KafkaConnectionError
537)
538
539
540RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES
541