1from __future__ import absolute_import 2 3import inspect 4import sys 5 6 7class KafkaError(RuntimeError): 8 retriable = False 9 # whether metadata should be refreshed on error 10 invalid_metadata = False 11 12 def __str__(self): 13 if not self.args: 14 return self.__class__.__name__ 15 return '{0}: {1}'.format(self.__class__.__name__, 16 super(KafkaError, self).__str__()) 17 18 19class IllegalStateError(KafkaError): 20 pass 21 22 23class IllegalArgumentError(KafkaError): 24 pass 25 26 27class NoBrokersAvailable(KafkaError): 28 retriable = True 29 invalid_metadata = True 30 31 32class NodeNotReadyError(KafkaError): 33 retriable = True 34 35 36class KafkaProtocolError(KafkaError): 37 retriable = True 38 39 40class CorrelationIdError(KafkaProtocolError): 41 retriable = True 42 43 44class Cancelled(KafkaError): 45 retriable = True 46 47 48class TooManyInFlightRequests(KafkaError): 49 retriable = True 50 51 52class StaleMetadata(KafkaError): 53 retriable = True 54 invalid_metadata = True 55 56 57class MetadataEmptyBrokerList(KafkaError): 58 retriable = True 59 60 61class UnrecognizedBrokerVersion(KafkaError): 62 pass 63 64 65class IncompatibleBrokerVersion(KafkaError): 66 pass 67 68 69class CommitFailedError(KafkaError): 70 def __init__(self, *args, **kwargs): 71 super(CommitFailedError, self).__init__( 72 """Commit cannot be completed since the group has already 73 rebalanced and assigned the partitions to another member. 74 This means that the time between subsequent calls to poll() 75 was longer than the configured max_poll_interval_ms, which 76 typically implies that the poll loop is spending too much 77 time message processing. You can address this either by 78 increasing the rebalance timeout with max_poll_interval_ms, 79 or by reducing the maximum size of batches returned in poll() 80 with max_poll_records. 81 """, *args, **kwargs) 82 83 84class AuthenticationMethodNotSupported(KafkaError): 85 pass 86 87 88class AuthenticationFailedError(KafkaError): 89 retriable = False 90 91 92class BrokerResponseError(KafkaError): 93 errno = None 94 message = None 95 description = None 96 97 def __str__(self): 98 """Add errno to standard KafkaError str""" 99 return '[Error {0}] {1}'.format( 100 self.errno, 101 super(BrokerResponseError, self).__str__()) 102 103 104class NoError(BrokerResponseError): 105 errno = 0 106 message = 'NO_ERROR' 107 description = 'No error--it worked!' 108 109 110class UnknownError(BrokerResponseError): 111 errno = -1 112 message = 'UNKNOWN' 113 description = 'An unexpected server error.' 114 115 116class OffsetOutOfRangeError(BrokerResponseError): 117 errno = 1 118 message = 'OFFSET_OUT_OF_RANGE' 119 description = ('The requested offset is outside the range of offsets' 120 ' maintained by the server for the given topic/partition.') 121 122 123class CorruptRecordException(BrokerResponseError): 124 errno = 2 125 message = 'CORRUPT_MESSAGE' 126 description = ('This message has failed its CRC checksum, exceeds the' 127 ' valid size, or is otherwise corrupt.') 128 129# Backward compatibility 130InvalidMessageError = CorruptRecordException 131 132 133class UnknownTopicOrPartitionError(BrokerResponseError): 134 errno = 3 135 message = 'UNKNOWN_TOPIC_OR_PARTITION' 136 description = ('This request is for a topic or partition that does not' 137 ' exist on this broker.') 138 retriable = True 139 invalid_metadata = True 140 141 142class InvalidFetchRequestError(BrokerResponseError): 143 errno = 4 144 message = 'INVALID_FETCH_SIZE' 145 description = 'The message has a negative size.' 146 147 148class LeaderNotAvailableError(BrokerResponseError): 149 errno = 5 150 message = 'LEADER_NOT_AVAILABLE' 151 description = ('This error is thrown if we are in the middle of a' 152 ' leadership election and there is currently no leader for' 153 ' this partition and hence it is unavailable for writes.') 154 retriable = True 155 invalid_metadata = True 156 157 158class NotLeaderForPartitionError(BrokerResponseError): 159 errno = 6 160 message = 'NOT_LEADER_FOR_PARTITION' 161 description = ('This error is thrown if the client attempts to send' 162 ' messages to a replica that is not the leader for some' 163 ' partition. It indicates that the clients metadata is out' 164 ' of date.') 165 retriable = True 166 invalid_metadata = True 167 168 169class RequestTimedOutError(BrokerResponseError): 170 errno = 7 171 message = 'REQUEST_TIMED_OUT' 172 description = ('This error is thrown if the request exceeds the' 173 ' user-specified time limit in the request.') 174 retriable = True 175 176 177class BrokerNotAvailableError(BrokerResponseError): 178 errno = 8 179 message = 'BROKER_NOT_AVAILABLE' 180 description = ('This is not a client facing error and is used mostly by' 181 ' tools when a broker is not alive.') 182 183 184class ReplicaNotAvailableError(BrokerResponseError): 185 errno = 9 186 message = 'REPLICA_NOT_AVAILABLE' 187 description = ('If replica is expected on a broker, but is not (this can be' 188 ' safely ignored).') 189 190 191class MessageSizeTooLargeError(BrokerResponseError): 192 errno = 10 193 message = 'MESSAGE_SIZE_TOO_LARGE' 194 description = ('The server has a configurable maximum message size to avoid' 195 ' unbounded memory allocation. This error is thrown if the' 196 ' client attempt to produce a message larger than this' 197 ' maximum.') 198 199 200class StaleControllerEpochError(BrokerResponseError): 201 errno = 11 202 message = 'STALE_CONTROLLER_EPOCH' 203 description = 'Internal error code for broker-to-broker communication.' 204 205 206class OffsetMetadataTooLargeError(BrokerResponseError): 207 errno = 12 208 message = 'OFFSET_METADATA_TOO_LARGE' 209 description = ('If you specify a string larger than configured maximum for' 210 ' offset metadata.') 211 212 213# TODO is this deprecated? https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes 214class StaleLeaderEpochCodeError(BrokerResponseError): 215 errno = 13 216 message = 'STALE_LEADER_EPOCH_CODE' 217 218 219class GroupLoadInProgressError(BrokerResponseError): 220 errno = 14 221 message = 'OFFSETS_LOAD_IN_PROGRESS' 222 description = ('The broker returns this error code for an offset fetch' 223 ' request if it is still loading offsets (after a leader' 224 ' change for that offsets topic partition), or in response' 225 ' to group membership requests (such as heartbeats) when' 226 ' group metadata is being loaded by the coordinator.') 227 retriable = True 228 229 230class GroupCoordinatorNotAvailableError(BrokerResponseError): 231 errno = 15 232 message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE' 233 description = ('The broker returns this error code for group coordinator' 234 ' requests, offset commits, and most group management' 235 ' requests if the offsets topic has not yet been created, or' 236 ' if the group coordinator is not active.') 237 retriable = True 238 239 240class NotCoordinatorForGroupError(BrokerResponseError): 241 errno = 16 242 message = 'NOT_COORDINATOR_FOR_CONSUMER' 243 description = ('The broker returns this error code if it receives an offset' 244 ' fetch or commit request for a group that it is not a' 245 ' coordinator for.') 246 retriable = True 247 248 249class InvalidTopicError(BrokerResponseError): 250 errno = 17 251 message = 'INVALID_TOPIC' 252 description = ('For a request which attempts to access an invalid topic' 253 ' (e.g. one which has an illegal name), or if an attempt' 254 ' is made to write to an internal topic (such as the' 255 ' consumer offsets topic).') 256 257 258class RecordListTooLargeError(BrokerResponseError): 259 errno = 18 260 message = 'RECORD_LIST_TOO_LARGE' 261 description = ('If a message batch in a produce request exceeds the maximum' 262 ' configured segment size.') 263 264 265class NotEnoughReplicasError(BrokerResponseError): 266 errno = 19 267 message = 'NOT_ENOUGH_REPLICAS' 268 description = ('Returned from a produce request when the number of in-sync' 269 ' replicas is lower than the configured minimum and' 270 ' requiredAcks is -1.') 271 retriable = True 272 273 274class NotEnoughReplicasAfterAppendError(BrokerResponseError): 275 errno = 20 276 message = 'NOT_ENOUGH_REPLICAS_AFTER_APPEND' 277 description = ('Returned from a produce request when the message was' 278 ' written to the log, but with fewer in-sync replicas than' 279 ' required.') 280 retriable = True 281 282 283class InvalidRequiredAcksError(BrokerResponseError): 284 errno = 21 285 message = 'INVALID_REQUIRED_ACKS' 286 description = ('Returned from a produce request if the requested' 287 ' requiredAcks is invalid (anything other than -1, 1, or 0).') 288 289 290class IllegalGenerationError(BrokerResponseError): 291 errno = 22 292 message = 'ILLEGAL_GENERATION' 293 description = ('Returned from group membership requests (such as heartbeats)' 294 ' when the generation id provided in the request is not the' 295 ' current generation.') 296 297 298class InconsistentGroupProtocolError(BrokerResponseError): 299 errno = 23 300 message = 'INCONSISTENT_GROUP_PROTOCOL' 301 description = ('Returned in join group when the member provides a protocol' 302 ' type or set of protocols which is not compatible with the' 303 ' current group.') 304 305 306class InvalidGroupIdError(BrokerResponseError): 307 errno = 24 308 message = 'INVALID_GROUP_ID' 309 description = 'Returned in join group when the groupId is empty or null.' 310 311 312class UnknownMemberIdError(BrokerResponseError): 313 errno = 25 314 message = 'UNKNOWN_MEMBER_ID' 315 description = ('Returned from group requests (offset commits/fetches,' 316 ' heartbeats, etc) when the memberId is not in the current' 317 ' generation.') 318 319 320class InvalidSessionTimeoutError(BrokerResponseError): 321 errno = 26 322 message = 'INVALID_SESSION_TIMEOUT' 323 description = ('Return in join group when the requested session timeout is' 324 ' outside of the allowed range on the broker') 325 326 327class RebalanceInProgressError(BrokerResponseError): 328 errno = 27 329 message = 'REBALANCE_IN_PROGRESS' 330 description = ('Returned in heartbeat requests when the coordinator has' 331 ' begun rebalancing the group. This indicates to the client' 332 ' that it should rejoin the group.') 333 334 335class InvalidCommitOffsetSizeError(BrokerResponseError): 336 errno = 28 337 message = 'INVALID_COMMIT_OFFSET_SIZE' 338 description = ('This error indicates that an offset commit was rejected' 339 ' because of oversize metadata.') 340 341 342class TopicAuthorizationFailedError(BrokerResponseError): 343 errno = 29 344 message = 'TOPIC_AUTHORIZATION_FAILED' 345 description = ('Returned by the broker when the client is not authorized to' 346 ' access the requested topic.') 347 348 349class GroupAuthorizationFailedError(BrokerResponseError): 350 errno = 30 351 message = 'GROUP_AUTHORIZATION_FAILED' 352 description = ('Returned by the broker when the client is not authorized to' 353 ' access a particular groupId.') 354 355 356class ClusterAuthorizationFailedError(BrokerResponseError): 357 errno = 31 358 message = 'CLUSTER_AUTHORIZATION_FAILED' 359 description = ('Returned by the broker when the client is not authorized to' 360 ' use an inter-broker or administrative API.') 361 362 363class InvalidTimestampError(BrokerResponseError): 364 errno = 32 365 message = 'INVALID_TIMESTAMP' 366 description = 'The timestamp of the message is out of acceptable range.' 367 368 369class UnsupportedSaslMechanismError(BrokerResponseError): 370 errno = 33 371 message = 'UNSUPPORTED_SASL_MECHANISM' 372 description = 'The broker does not support the requested SASL mechanism.' 373 374 375class IllegalSaslStateError(BrokerResponseError): 376 errno = 34 377 message = 'ILLEGAL_SASL_STATE' 378 description = 'Request is not valid given the current SASL state.' 379 380 381class UnsupportedVersionError(BrokerResponseError): 382 errno = 35 383 message = 'UNSUPPORTED_VERSION' 384 description = 'The version of API is not supported.' 385 386 387class TopicAlreadyExistsError(BrokerResponseError): 388 errno = 36 389 message = 'TOPIC_ALREADY_EXISTS' 390 description = 'Topic with this name already exists.' 391 392 393class InvalidPartitionsError(BrokerResponseError): 394 errno = 37 395 message = 'INVALID_PARTITIONS' 396 description = 'Number of partitions is invalid.' 397 398 399class InvalidReplicationFactorError(BrokerResponseError): 400 errno = 38 401 message = 'INVALID_REPLICATION_FACTOR' 402 description = 'Replication-factor is invalid.' 403 404 405class InvalidReplicationAssignmentError(BrokerResponseError): 406 errno = 39 407 message = 'INVALID_REPLICATION_ASSIGNMENT' 408 description = 'Replication assignment is invalid.' 409 410 411class InvalidConfigurationError(BrokerResponseError): 412 errno = 40 413 message = 'INVALID_CONFIG' 414 description = 'Configuration is invalid.' 415 416 417class NotControllerError(BrokerResponseError): 418 errno = 41 419 message = 'NOT_CONTROLLER' 420 description = 'This is not the correct controller for this cluster.' 421 retriable = True 422 423 424class InvalidRequestError(BrokerResponseError): 425 errno = 42 426 message = 'INVALID_REQUEST' 427 description = ('This most likely occurs because of a request being' 428 ' malformed by the client library or the message was' 429 ' sent to an incompatible broker. See the broker logs' 430 ' for more details.') 431 432 433class UnsupportedForMessageFormatError(BrokerResponseError): 434 errno = 43 435 message = 'UNSUPPORTED_FOR_MESSAGE_FORMAT' 436 description = ('The message format version on the broker does not' 437 ' support this request.') 438 439 440class PolicyViolationError(BrokerResponseError): 441 errno = 44 442 message = 'POLICY_VIOLATION' 443 description = 'Request parameters do not satisfy the configured policy.' 444 445 446class KafkaUnavailableError(KafkaError): 447 pass 448 449 450class KafkaTimeoutError(KafkaError): 451 pass 452 453 454class FailedPayloadsError(KafkaError): 455 def __init__(self, payload, *args): 456 super(FailedPayloadsError, self).__init__(*args) 457 self.payload = payload 458 459 460class KafkaConnectionError(KafkaError): 461 retriable = True 462 invalid_metadata = True 463 464 465class ConnectionError(KafkaConnectionError): 466 """Deprecated""" 467 468 469class BufferUnderflowError(KafkaError): 470 pass 471 472 473class ChecksumError(KafkaError): 474 pass 475 476 477class ConsumerFetchSizeTooSmall(KafkaError): 478 pass 479 480 481class ConsumerNoMoreData(KafkaError): 482 pass 483 484 485class ProtocolError(KafkaError): 486 pass 487 488 489class UnsupportedCodecError(KafkaError): 490 pass 491 492 493class KafkaConfigurationError(KafkaError): 494 pass 495 496 497class QuotaViolationError(KafkaError): 498 pass 499 500 501class AsyncProducerQueueFull(KafkaError): 502 def __init__(self, failed_msgs, *args): 503 super(AsyncProducerQueueFull, self).__init__(*args) 504 self.failed_msgs = failed_msgs 505 506 507def _iter_broker_errors(): 508 for name, obj in inspect.getmembers(sys.modules[__name__]): 509 if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError: 510 yield obj 511 512 513kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()]) 514 515 516def for_code(error_code): 517 return kafka_errors.get(error_code, UnknownError) 518 519 520def check_error(response): 521 if isinstance(response, Exception): 522 raise response 523 if response.error: 524 error_class = kafka_errors.get(response.error, UnknownError) 525 raise error_class(response) 526 527 528RETRY_BACKOFF_ERROR_TYPES = ( 529 KafkaUnavailableError, LeaderNotAvailableError, 530 KafkaConnectionError, FailedPayloadsError 531) 532 533 534RETRY_REFRESH_ERROR_TYPES = ( 535 NotLeaderForPartitionError, UnknownTopicOrPartitionError, 536 LeaderNotAvailableError, KafkaConnectionError 537) 538 539 540RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES 541