1from __future__ import absolute_import, division 2 3import collections 4import copy 5import functools 6import logging 7import time 8 9from kafka.vendor import six 10 11from kafka.coordinator.base import BaseCoordinator, Generation 12from kafka.coordinator.assignors.range import RangePartitionAssignor 13from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor 14from kafka.coordinator.protocol import ConsumerProtocol 15import kafka.errors as Errors 16from kafka.future import Future 17from kafka.metrics import AnonMeasurable 18from kafka.metrics.stats import Avg, Count, Max, Rate 19from kafka.protocol.commit import OffsetCommitRequest, OffsetFetchRequest 20from kafka.structs import OffsetAndMetadata, TopicPartition 21from kafka.util import WeakMethod 22 23 24log = logging.getLogger(__name__) 25 26 27class ConsumerCoordinator(BaseCoordinator): 28 """This class manages the coordination process with the consumer coordinator.""" 29 DEFAULT_CONFIG = { 30 'group_id': 'kafka-python-default-group', 31 'enable_auto_commit': True, 32 'auto_commit_interval_ms': 5000, 33 'default_offset_commit_callback': None, 34 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor), 35 'session_timeout_ms': 10000, 36 'heartbeat_interval_ms': 3000, 37 'max_poll_interval_ms': 300000, 38 'retry_backoff_ms': 100, 39 'api_version': (0, 10, 1), 40 'exclude_internal_topics': True, 41 'metric_group_prefix': 'consumer' 42 } 43 44 def __init__(self, client, subscription, metrics, **configs): 45 """Initialize the coordination manager. 46 47 Keyword Arguments: 48 group_id (str): name of the consumer group to join for dynamic 49 partition assignment (if enabled), and to use for fetching and 50 committing offsets. Default: 'kafka-python-default-group' 51 enable_auto_commit (bool): If true the consumer's offset will be 52 periodically committed in the background. Default: True. 53 auto_commit_interval_ms (int): milliseconds between automatic 54 offset commits, if enable_auto_commit is True. Default: 5000. 55 default_offset_commit_callback (callable): called as 56 callback(offsets, exception) response will be either an Exception 57 or None. This callback can be used to trigger custom actions when 58 a commit request completes. 59 assignors (list): List of objects to use to distribute partition 60 ownership amongst consumer instances when group management is 61 used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor] 62 heartbeat_interval_ms (int): The expected time in milliseconds 63 between heartbeats to the consumer coordinator when using 64 Kafka's group management feature. Heartbeats are used to ensure 65 that the consumer's session stays active and to facilitate 66 rebalancing when new consumers join or leave the group. The 67 value must be set lower than session_timeout_ms, but typically 68 should be set no higher than 1/3 of that value. It can be 69 adjusted even lower to control the expected time for normal 70 rebalances. Default: 3000 71 session_timeout_ms (int): The timeout used to detect failures when 72 using Kafka's group managementment facilities. Default: 30000 73 retry_backoff_ms (int): Milliseconds to backoff when retrying on 74 errors. Default: 100. 75 exclude_internal_topics (bool): Whether records from internal topics 76 (such as offsets) should be exposed to the consumer. If set to 77 True the only way to receive records from an internal topic is 78 subscribing to it. Requires 0.10+. Default: True 79 """ 80 super(ConsumerCoordinator, self).__init__(client, metrics, **configs) 81 82 self.config = copy.copy(self.DEFAULT_CONFIG) 83 for key in self.config: 84 if key in configs: 85 self.config[key] = configs[key] 86 87 self._subscription = subscription 88 self._is_leader = False 89 self._joined_subscription = set() 90 self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster) 91 self._assignment_snapshot = None 92 self._cluster = client.cluster 93 self.auto_commit_interval = self.config['auto_commit_interval_ms'] / 1000 94 self.next_auto_commit_deadline = None 95 self.completed_offset_commits = collections.deque() 96 97 if self.config['default_offset_commit_callback'] is None: 98 self.config['default_offset_commit_callback'] = self._default_offset_commit_callback 99 100 if self.config['group_id'] is not None: 101 if self.config['api_version'] >= (0, 9): 102 if not self.config['assignors']: 103 raise Errors.KafkaConfigurationError('Coordinator requires assignors') 104 if self.config['api_version'] < (0, 10, 1): 105 if self.config['max_poll_interval_ms'] != self.config['session_timeout_ms']: 106 raise Errors.KafkaConfigurationError("Broker version %s does not support " 107 "different values for max_poll_interval_ms " 108 "and session_timeout_ms") 109 110 if self.config['enable_auto_commit']: 111 if self.config['api_version'] < (0, 8, 1): 112 log.warning('Broker version (%s) does not support offset' 113 ' commits; disabling auto-commit.', 114 self.config['api_version']) 115 self.config['enable_auto_commit'] = False 116 elif self.config['group_id'] is None: 117 log.warning('group_id is None: disabling auto-commit.') 118 self.config['enable_auto_commit'] = False 119 else: 120 self.next_auto_commit_deadline = time.time() + self.auto_commit_interval 121 122 self.consumer_sensors = ConsumerCoordinatorMetrics( 123 metrics, self.config['metric_group_prefix'], self._subscription) 124 125 self._cluster.request_update() 126 self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) 127 128 def __del__(self): 129 if hasattr(self, '_cluster') and self._cluster: 130 self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) 131 super(ConsumerCoordinator, self).__del__() 132 133 def protocol_type(self): 134 return ConsumerProtocol.PROTOCOL_TYPE 135 136 def group_protocols(self): 137 """Returns list of preferred (protocols, metadata)""" 138 if self._subscription.subscription is None: 139 raise Errors.IllegalStateError('Consumer has not subscribed to topics') 140 # dpkp note: I really dislike this. 141 # why? because we are using this strange method group_protocols, 142 # which is seemingly innocuous, to set internal state (_joined_subscription) 143 # that is later used to check whether metadata has changed since we joined a group 144 # but there is no guarantee that this method, group_protocols, will get called 145 # in the correct sequence or that it will only be called when we want it to be. 146 # So this really should be moved elsewhere, but I don't have the energy to 147 # work that out right now. If you read this at some later date after the mutable 148 # state has bitten you... I'm sorry! It mimics the java client, and that's the 149 # best I've got for now. 150 self._joined_subscription = set(self._subscription.subscription) 151 metadata_list = [] 152 for assignor in self.config['assignors']: 153 metadata = assignor.metadata(self._joined_subscription) 154 group_protocol = (assignor.name, metadata) 155 metadata_list.append(group_protocol) 156 return metadata_list 157 158 def _handle_metadata_update(self, cluster): 159 # if we encounter any unauthorized topics, raise an exception 160 if cluster.unauthorized_topics: 161 raise Errors.TopicAuthorizationFailedError(cluster.unauthorized_topics) 162 163 if self._subscription.subscribed_pattern: 164 topics = [] 165 for topic in cluster.topics(self.config['exclude_internal_topics']): 166 if self._subscription.subscribed_pattern.match(topic): 167 topics.append(topic) 168 169 if set(topics) != self._subscription.subscription: 170 self._subscription.change_subscription(topics) 171 self._client.set_topics(self._subscription.group_subscription()) 172 173 # check if there are any changes to the metadata which should trigger 174 # a rebalance 175 if self._subscription.partitions_auto_assigned(): 176 metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster) 177 if self._metadata_snapshot != metadata_snapshot: 178 self._metadata_snapshot = metadata_snapshot 179 180 # If we haven't got group coordinator support, 181 # just assign all partitions locally 182 if self._auto_assign_all_partitions(): 183 self._subscription.assign_from_subscribed([ 184 TopicPartition(topic, partition) 185 for topic in self._subscription.subscription 186 for partition in self._metadata_snapshot[topic] 187 ]) 188 189 def _auto_assign_all_partitions(self): 190 # For users that use "subscribe" without group support, 191 # we will simply assign all partitions to this consumer 192 if self.config['api_version'] < (0, 9): 193 return True 194 elif self.config['group_id'] is None: 195 return True 196 else: 197 return False 198 199 def _build_metadata_snapshot(self, subscription, cluster): 200 metadata_snapshot = {} 201 for topic in subscription.group_subscription(): 202 partitions = cluster.partitions_for_topic(topic) or [] 203 metadata_snapshot[topic] = set(partitions) 204 return metadata_snapshot 205 206 def _lookup_assignor(self, name): 207 for assignor in self.config['assignors']: 208 if assignor.name == name: 209 return assignor 210 return None 211 212 def _on_join_complete(self, generation, member_id, protocol, 213 member_assignment_bytes): 214 # only the leader is responsible for monitoring for metadata changes 215 # (i.e. partition changes) 216 if not self._is_leader: 217 self._assignment_snapshot = None 218 219 assignor = self._lookup_assignor(protocol) 220 assert assignor, 'Coordinator selected invalid assignment protocol: %s' % (protocol,) 221 222 assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) 223 224 # set the flag to refresh last committed offsets 225 self._subscription.needs_fetch_committed_offsets = True 226 227 # update partition assignment 228 self._subscription.assign_from_subscribed(assignment.partitions()) 229 230 # give the assignor a chance to update internal state 231 # based on the received assignment 232 assignor.on_assignment(assignment) 233 234 # reschedule the auto commit starting from now 235 self.next_auto_commit_deadline = time.time() + self.auto_commit_interval 236 237 assigned = set(self._subscription.assigned_partitions()) 238 log.info("Setting newly assigned partitions %s for group %s", 239 assigned, self.group_id) 240 241 # execute the user's callback after rebalance 242 if self._subscription.listener: 243 try: 244 self._subscription.listener.on_partitions_assigned(assigned) 245 except Exception: 246 log.exception("User provided listener %s for group %s" 247 " failed on partition assignment: %s", 248 self._subscription.listener, self.group_id, 249 assigned) 250 251 def poll(self): 252 """ 253 Poll for coordinator events. Only applicable if group_id is set, and 254 broker version supports GroupCoordinators. This ensures that the 255 coordinator is known, and if using automatic partition assignment, 256 ensures that the consumer has joined the group. This also handles 257 periodic offset commits if they are enabled. 258 """ 259 if self.group_id is None or self.config['api_version'] < (0, 8, 2): 260 return 261 262 self._invoke_completed_offset_commit_callbacks() 263 self.ensure_coordinator_ready() 264 265 if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned(): 266 if self.need_rejoin(): 267 # due to a race condition between the initial metadata fetch and the 268 # initial rebalance, we need to ensure that the metadata is fresh 269 # before joining initially, and then request the metadata update. If 270 # metadata update arrives while the rebalance is still pending (for 271 # example, when the join group is still inflight), then we will lose 272 # track of the fact that we need to rebalance again to reflect the 273 # change to the topic subscription. Without ensuring that the 274 # metadata is fresh, any metadata update that changes the topic 275 # subscriptions and arrives while a rebalance is in progress will 276 # essentially be ignored. See KAFKA-3949 for the complete 277 # description of the problem. 278 if self._subscription.subscribed_pattern: 279 metadata_update = self._client.cluster.request_update() 280 self._client.poll(future=metadata_update) 281 282 self.ensure_active_group() 283 284 self.poll_heartbeat() 285 286 self._maybe_auto_commit_offsets_async() 287 288 def time_to_next_poll(self): 289 """Return seconds (float) remaining until :meth:`.poll` should be called again""" 290 if not self.config['enable_auto_commit']: 291 return self.time_to_next_heartbeat() 292 293 if time.time() > self.next_auto_commit_deadline: 294 return 0 295 296 return min(self.next_auto_commit_deadline - time.time(), 297 self.time_to_next_heartbeat()) 298 299 def _perform_assignment(self, leader_id, assignment_strategy, members): 300 assignor = self._lookup_assignor(assignment_strategy) 301 assert assignor, 'Invalid assignment protocol: %s' % (assignment_strategy,) 302 member_metadata = {} 303 all_subscribed_topics = set() 304 for member_id, metadata_bytes in members: 305 metadata = ConsumerProtocol.METADATA.decode(metadata_bytes) 306 member_metadata[member_id] = metadata 307 all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member 308 309 # the leader will begin watching for changes to any of the topics 310 # the group is interested in, which ensures that all metadata changes 311 # will eventually be seen 312 # Because assignment typically happens within response callbacks, 313 # we cannot block on metadata updates here (no recursion into poll()) 314 self._subscription.group_subscribe(all_subscribed_topics) 315 self._client.set_topics(self._subscription.group_subscription()) 316 317 # keep track of the metadata used for assignment so that we can check 318 # after rebalance completion whether anything has changed 319 self._cluster.request_update() 320 self._is_leader = True 321 self._assignment_snapshot = self._metadata_snapshot 322 323 log.debug("Performing assignment for group %s using strategy %s" 324 " with subscriptions %s", self.group_id, assignor.name, 325 member_metadata) 326 327 assignments = assignor.assign(self._cluster, member_metadata) 328 329 log.debug("Finished assignment for group %s: %s", self.group_id, assignments) 330 331 group_assignment = {} 332 for member_id, assignment in six.iteritems(assignments): 333 group_assignment[member_id] = assignment 334 return group_assignment 335 336 def _on_join_prepare(self, generation, member_id): 337 # commit offsets prior to rebalance if auto-commit enabled 338 self._maybe_auto_commit_offsets_sync() 339 340 # execute the user's callback before rebalance 341 log.info("Revoking previously assigned partitions %s for group %s", 342 self._subscription.assigned_partitions(), self.group_id) 343 if self._subscription.listener: 344 try: 345 revoked = set(self._subscription.assigned_partitions()) 346 self._subscription.listener.on_partitions_revoked(revoked) 347 except Exception: 348 log.exception("User provided subscription listener %s" 349 " for group %s failed on_partitions_revoked", 350 self._subscription.listener, self.group_id) 351 352 self._is_leader = False 353 self._subscription.reset_group_subscription() 354 355 def need_rejoin(self): 356 """Check whether the group should be rejoined 357 358 Returns: 359 bool: True if consumer should rejoin group, False otherwise 360 """ 361 if not self._subscription.partitions_auto_assigned(): 362 return False 363 364 if self._auto_assign_all_partitions(): 365 return False 366 367 # we need to rejoin if we performed the assignment and metadata has changed 368 if (self._assignment_snapshot is not None 369 and self._assignment_snapshot != self._metadata_snapshot): 370 return True 371 372 # we need to join if our subscription has changed since the last join 373 if (self._joined_subscription is not None 374 and self._joined_subscription != self._subscription.subscription): 375 return True 376 377 return super(ConsumerCoordinator, self).need_rejoin() 378 379 def refresh_committed_offsets_if_needed(self): 380 """Fetch committed offsets for assigned partitions.""" 381 if self._subscription.needs_fetch_committed_offsets: 382 offsets = self.fetch_committed_offsets(self._subscription.assigned_partitions()) 383 for partition, offset in six.iteritems(offsets): 384 # verify assignment is still active 385 if self._subscription.is_assigned(partition): 386 self._subscription.assignment[partition].committed = offset.offset 387 self._subscription.needs_fetch_committed_offsets = False 388 389 def fetch_committed_offsets(self, partitions): 390 """Fetch the current committed offsets for specified partitions 391 392 Arguments: 393 partitions (list of TopicPartition): partitions to fetch 394 395 Returns: 396 dict: {TopicPartition: OffsetAndMetadata} 397 """ 398 if not partitions: 399 return {} 400 401 while True: 402 self.ensure_coordinator_ready() 403 404 # contact coordinator to fetch committed offsets 405 future = self._send_offset_fetch_request(partitions) 406 self._client.poll(future=future) 407 408 if future.succeeded(): 409 return future.value 410 411 if not future.retriable(): 412 raise future.exception # pylint: disable-msg=raising-bad-type 413 414 time.sleep(self.config['retry_backoff_ms'] / 1000) 415 416 def close(self, autocommit=True): 417 """Close the coordinator, leave the current group, 418 and reset local generation / member_id. 419 420 Keyword Arguments: 421 autocommit (bool): If auto-commit is configured for this consumer, 422 this optional flag causes the consumer to attempt to commit any 423 pending consumed offsets prior to close. Default: True 424 """ 425 try: 426 if autocommit: 427 self._maybe_auto_commit_offsets_sync() 428 finally: 429 super(ConsumerCoordinator, self).close() 430 431 def _invoke_completed_offset_commit_callbacks(self): 432 while self.completed_offset_commits: 433 callback, offsets, exception = self.completed_offset_commits.popleft() 434 callback(offsets, exception) 435 436 def commit_offsets_async(self, offsets, callback=None): 437 """Commit specific offsets asynchronously. 438 439 Arguments: 440 offsets (dict {TopicPartition: OffsetAndMetadata}): what to commit 441 callback (callable, optional): called as callback(offsets, response) 442 response will be either an Exception or a OffsetCommitResponse 443 struct. This callback can be used to trigger custom actions when 444 a commit request completes. 445 446 Returns: 447 kafka.future.Future 448 """ 449 self._invoke_completed_offset_commit_callbacks() 450 if not self.coordinator_unknown(): 451 future = self._do_commit_offsets_async(offsets, callback) 452 else: 453 # we don't know the current coordinator, so try to find it and then 454 # send the commit or fail (we don't want recursive retries which can 455 # cause offset commits to arrive out of order). Note that there may 456 # be multiple offset commits chained to the same coordinator lookup 457 # request. This is fine because the listeners will be invoked in the 458 # same order that they were added. Note also that BaseCoordinator 459 # prevents multiple concurrent coordinator lookup requests. 460 future = self.lookup_coordinator() 461 future.add_callback(lambda r: functools.partial(self._do_commit_offsets_async, offsets, callback)()) 462 if callback: 463 future.add_errback(lambda e: self.completed_offset_commits.appendleft((callback, offsets, e))) 464 465 # ensure the commit has a chance to be transmitted (without blocking on 466 # its completion). Note that commits are treated as heartbeats by the 467 # coordinator, so there is no need to explicitly allow heartbeats 468 # through delayed task execution. 469 self._client.poll(timeout_ms=0) # no wakeup if we add that feature 470 471 return future 472 473 def _do_commit_offsets_async(self, offsets, callback=None): 474 assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' 475 assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) 476 assert all(map(lambda v: isinstance(v, OffsetAndMetadata), 477 offsets.values())) 478 if callback is None: 479 callback = self.config['default_offset_commit_callback'] 480 self._subscription.needs_fetch_committed_offsets = True 481 future = self._send_offset_commit_request(offsets) 482 future.add_both(lambda res: self.completed_offset_commits.appendleft((callback, offsets, res))) 483 return future 484 485 def commit_offsets_sync(self, offsets): 486 """Commit specific offsets synchronously. 487 488 This method will retry until the commit completes successfully or an 489 unrecoverable error is encountered. 490 491 Arguments: 492 offsets (dict {TopicPartition: OffsetAndMetadata}): what to commit 493 494 Raises error on failure 495 """ 496 assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' 497 assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) 498 assert all(map(lambda v: isinstance(v, OffsetAndMetadata), 499 offsets.values())) 500 self._invoke_completed_offset_commit_callbacks() 501 if not offsets: 502 return 503 504 while True: 505 self.ensure_coordinator_ready() 506 507 future = self._send_offset_commit_request(offsets) 508 self._client.poll(future=future) 509 510 if future.succeeded(): 511 return future.value 512 513 if not future.retriable(): 514 raise future.exception # pylint: disable-msg=raising-bad-type 515 516 time.sleep(self.config['retry_backoff_ms'] / 1000) 517 518 def _maybe_auto_commit_offsets_sync(self): 519 if self.config['enable_auto_commit']: 520 try: 521 self.commit_offsets_sync(self._subscription.all_consumed_offsets()) 522 523 # The three main group membership errors are known and should not 524 # require a stacktrace -- just a warning 525 except (Errors.UnknownMemberIdError, 526 Errors.IllegalGenerationError, 527 Errors.RebalanceInProgressError): 528 log.warning("Offset commit failed: group membership out of date" 529 " This is likely to cause duplicate message" 530 " delivery.") 531 except Exception: 532 log.exception("Offset commit failed: This is likely to cause" 533 " duplicate message delivery") 534 535 def _send_offset_commit_request(self, offsets): 536 """Commit offsets for the specified list of topics and partitions. 537 538 This is a non-blocking call which returns a request future that can be 539 polled in the case of a synchronous commit or ignored in the 540 asynchronous case. 541 542 Arguments: 543 offsets (dict of {TopicPartition: OffsetAndMetadata}): what should 544 be committed 545 546 Returns: 547 Future: indicating whether the commit was successful or not 548 """ 549 assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' 550 assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) 551 assert all(map(lambda v: isinstance(v, OffsetAndMetadata), 552 offsets.values())) 553 if not offsets: 554 log.debug('No offsets to commit') 555 return Future().success(None) 556 557 node_id = self.coordinator() 558 if node_id is None: 559 return Future().failure(Errors.GroupCoordinatorNotAvailableError) 560 561 562 # create the offset commit request 563 offset_data = collections.defaultdict(dict) 564 for tp, offset in six.iteritems(offsets): 565 offset_data[tp.topic][tp.partition] = offset 566 567 if self._subscription.partitions_auto_assigned(): 568 generation = self.generation() 569 else: 570 generation = Generation.NO_GENERATION 571 572 # if the generation is None, we are not part of an active group 573 # (and we expect to be). The only thing we can do is fail the commit 574 # and let the user rejoin the group in poll() 575 if self.config['api_version'] >= (0, 9) and generation is None: 576 return Future().failure(Errors.CommitFailedError()) 577 578 if self.config['api_version'] >= (0, 9): 579 request = OffsetCommitRequest[2]( 580 self.group_id, 581 generation.generation_id, 582 generation.member_id, 583 OffsetCommitRequest[2].DEFAULT_RETENTION_TIME, 584 [( 585 topic, [( 586 partition, 587 offset.offset, 588 offset.metadata 589 ) for partition, offset in six.iteritems(partitions)] 590 ) for topic, partitions in six.iteritems(offset_data)] 591 ) 592 elif self.config['api_version'] >= (0, 8, 2): 593 request = OffsetCommitRequest[1]( 594 self.group_id, -1, '', 595 [( 596 topic, [( 597 partition, 598 offset.offset, 599 -1, 600 offset.metadata 601 ) for partition, offset in six.iteritems(partitions)] 602 ) for topic, partitions in six.iteritems(offset_data)] 603 ) 604 elif self.config['api_version'] >= (0, 8, 1): 605 request = OffsetCommitRequest[0]( 606 self.group_id, 607 [( 608 topic, [( 609 partition, 610 offset.offset, 611 offset.metadata 612 ) for partition, offset in six.iteritems(partitions)] 613 ) for topic, partitions in six.iteritems(offset_data)] 614 ) 615 616 log.debug("Sending offset-commit request with %s for group %s to %s", 617 offsets, self.group_id, node_id) 618 619 future = Future() 620 _f = self._client.send(node_id, request) 621 _f.add_callback(self._handle_offset_commit_response, offsets, future, time.time()) 622 _f.add_errback(self._failed_request, node_id, request, future) 623 return future 624 625 def _handle_offset_commit_response(self, offsets, future, send_time, response): 626 # TODO look at adding request_latency_ms to response (like java kafka) 627 self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000) 628 unauthorized_topics = set() 629 630 for topic, partitions in response.topics: 631 for partition, error_code in partitions: 632 tp = TopicPartition(topic, partition) 633 offset = offsets[tp] 634 635 error_type = Errors.for_code(error_code) 636 if error_type is Errors.NoError: 637 log.debug("Group %s committed offset %s for partition %s", 638 self.group_id, offset, tp) 639 if self._subscription.is_assigned(tp): 640 self._subscription.assignment[tp].committed = offset.offset 641 elif error_type is Errors.GroupAuthorizationFailedError: 642 log.error("Not authorized to commit offsets for group %s", 643 self.group_id) 644 future.failure(error_type(self.group_id)) 645 return 646 elif error_type is Errors.TopicAuthorizationFailedError: 647 unauthorized_topics.add(topic) 648 elif error_type in (Errors.OffsetMetadataTooLargeError, 649 Errors.InvalidCommitOffsetSizeError): 650 # raise the error to the user 651 log.debug("OffsetCommit for group %s failed on partition %s" 652 " %s", self.group_id, tp, error_type.__name__) 653 future.failure(error_type()) 654 return 655 elif error_type is Errors.GroupLoadInProgressError: 656 # just retry 657 log.debug("OffsetCommit for group %s failed: %s", 658 self.group_id, error_type.__name__) 659 future.failure(error_type(self.group_id)) 660 return 661 elif error_type in (Errors.GroupCoordinatorNotAvailableError, 662 Errors.NotCoordinatorForGroupError, 663 Errors.RequestTimedOutError): 664 log.debug("OffsetCommit for group %s failed: %s", 665 self.group_id, error_type.__name__) 666 self.coordinator_dead(error_type()) 667 future.failure(error_type(self.group_id)) 668 return 669 elif error_type in (Errors.UnknownMemberIdError, 670 Errors.IllegalGenerationError, 671 Errors.RebalanceInProgressError): 672 # need to re-join group 673 error = error_type(self.group_id) 674 log.debug("OffsetCommit for group %s failed: %s", 675 self.group_id, error) 676 self.reset_generation() 677 future.failure(Errors.CommitFailedError()) 678 return 679 else: 680 log.error("Group %s failed to commit partition %s at offset" 681 " %s: %s", self.group_id, tp, offset, 682 error_type.__name__) 683 future.failure(error_type()) 684 return 685 686 if unauthorized_topics: 687 log.error("Not authorized to commit to topics %s for group %s", 688 unauthorized_topics, self.group_id) 689 future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics)) 690 else: 691 future.success(None) 692 693 def _send_offset_fetch_request(self, partitions): 694 """Fetch the committed offsets for a set of partitions. 695 696 This is a non-blocking call. The returned future can be polled to get 697 the actual offsets returned from the broker. 698 699 Arguments: 700 partitions (list of TopicPartition): the partitions to fetch 701 702 Returns: 703 Future: resolves to dict of offsets: {TopicPartition: int} 704 """ 705 assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' 706 assert all(map(lambda k: isinstance(k, TopicPartition), partitions)) 707 if not partitions: 708 return Future().success({}) 709 710 node_id = self.coordinator() 711 if node_id is None: 712 return Future().failure(Errors.GroupCoordinatorNotAvailableError) 713 714 # Verify node is ready 715 if not self._client.ready(node_id): 716 log.debug("Node %s not ready -- failing offset fetch request", 717 node_id) 718 return Future().failure(Errors.NodeNotReadyError) 719 720 log.debug("Group %s fetching committed offsets for partitions: %s", 721 self.group_id, partitions) 722 # construct the request 723 topic_partitions = collections.defaultdict(set) 724 for tp in partitions: 725 topic_partitions[tp.topic].add(tp.partition) 726 727 if self.config['api_version'] >= (0, 8, 2): 728 request = OffsetFetchRequest[1]( 729 self.group_id, 730 list(topic_partitions.items()) 731 ) 732 else: 733 request = OffsetFetchRequest[0]( 734 self.group_id, 735 list(topic_partitions.items()) 736 ) 737 738 # send the request with a callback 739 future = Future() 740 _f = self._client.send(node_id, request) 741 _f.add_callback(self._handle_offset_fetch_response, future) 742 _f.add_errback(self._failed_request, node_id, request, future) 743 return future 744 745 def _handle_offset_fetch_response(self, future, response): 746 offsets = {} 747 for topic, partitions in response.topics: 748 for partition, offset, metadata, error_code in partitions: 749 tp = TopicPartition(topic, partition) 750 error_type = Errors.for_code(error_code) 751 if error_type is not Errors.NoError: 752 error = error_type() 753 log.debug("Group %s failed to fetch offset for partition" 754 " %s: %s", self.group_id, tp, error) 755 if error_type is Errors.GroupLoadInProgressError: 756 # just retry 757 future.failure(error) 758 elif error_type is Errors.NotCoordinatorForGroupError: 759 # re-discover the coordinator and retry 760 self.coordinator_dead(error_type()) 761 future.failure(error) 762 elif error_type is Errors.UnknownTopicOrPartitionError: 763 log.warning("OffsetFetchRequest -- unknown topic %s" 764 " (have you committed any offsets yet?)", 765 topic) 766 continue 767 else: 768 log.error("Unknown error fetching offsets for %s: %s", 769 tp, error) 770 future.failure(error) 771 return 772 elif offset >= 0: 773 # record the position with the offset 774 # (-1 indicates no committed offset to fetch) 775 offsets[tp] = OffsetAndMetadata(offset, metadata) 776 else: 777 log.debug("Group %s has no committed offset for partition" 778 " %s", self.group_id, tp) 779 future.success(offsets) 780 781 def _default_offset_commit_callback(self, offsets, exception): 782 if exception is not None: 783 log.error("Offset commit failed: %s", exception) 784 785 def _commit_offsets_async_on_complete(self, offsets, exception): 786 if exception is not None: 787 log.warning("Auto offset commit failed for group %s: %s", 788 self.group_id, exception) 789 if getattr(exception, 'retriable', False): 790 self.next_auto_commit_deadline = min(time.time() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline) 791 else: 792 log.debug("Completed autocommit of offsets %s for group %s", 793 offsets, self.group_id) 794 795 def _maybe_auto_commit_offsets_async(self): 796 if self.config['enable_auto_commit']: 797 if self.coordinator_unknown(): 798 self.next_auto_commit_deadline = time.time() + self.config['retry_backoff_ms'] / 1000 799 elif time.time() > self.next_auto_commit_deadline: 800 self.next_auto_commit_deadline = time.time() + self.auto_commit_interval 801 self.commit_offsets_async(self._subscription.all_consumed_offsets(), 802 self._commit_offsets_async_on_complete) 803 804 805class ConsumerCoordinatorMetrics(object): 806 def __init__(self, metrics, metric_group_prefix, subscription): 807 self.metrics = metrics 808 self.metric_group_name = '%s-coordinator-metrics' % (metric_group_prefix,) 809 810 self.commit_latency = metrics.sensor('commit-latency') 811 self.commit_latency.add(metrics.metric_name( 812 'commit-latency-avg', self.metric_group_name, 813 'The average time taken for a commit request'), Avg()) 814 self.commit_latency.add(metrics.metric_name( 815 'commit-latency-max', self.metric_group_name, 816 'The max time taken for a commit request'), Max()) 817 self.commit_latency.add(metrics.metric_name( 818 'commit-rate', self.metric_group_name, 819 'The number of commit calls per second'), Rate(sampled_stat=Count())) 820 821 num_parts = AnonMeasurable(lambda config, now: 822 len(subscription.assigned_partitions())) 823 metrics.add_metric(metrics.metric_name( 824 'assigned-partitions', self.metric_group_name, 825 'The number of partitions currently assigned to this consumer'), 826 num_parts) 827