1from __future__ import absolute_import, division
2
3import collections
4import copy
5import functools
6import logging
7import time
8
9from kafka.vendor import six
10
11from kafka.coordinator.base import BaseCoordinator, Generation
12from kafka.coordinator.assignors.range import RangePartitionAssignor
13from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
14from kafka.coordinator.protocol import ConsumerProtocol
15import kafka.errors as Errors
16from kafka.future import Future
17from kafka.metrics import AnonMeasurable
18from kafka.metrics.stats import Avg, Count, Max, Rate
19from kafka.protocol.commit import OffsetCommitRequest, OffsetFetchRequest
20from kafka.structs import OffsetAndMetadata, TopicPartition
21from kafka.util import WeakMethod
22
23
24log = logging.getLogger(__name__)
25
26
27class ConsumerCoordinator(BaseCoordinator):
28    """This class manages the coordination process with the consumer coordinator."""
29    DEFAULT_CONFIG = {
30        'group_id': 'kafka-python-default-group',
31        'enable_auto_commit': True,
32        'auto_commit_interval_ms': 5000,
33        'default_offset_commit_callback': None,
34        'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor),
35        'session_timeout_ms': 10000,
36        'heartbeat_interval_ms': 3000,
37        'max_poll_interval_ms': 300000,
38        'retry_backoff_ms': 100,
39        'api_version': (0, 10, 1),
40        'exclude_internal_topics': True,
41        'metric_group_prefix': 'consumer'
42    }
43
44    def __init__(self, client, subscription, metrics, **configs):
45        """Initialize the coordination manager.
46
47        Keyword Arguments:
48            group_id (str): name of the consumer group to join for dynamic
49                partition assignment (if enabled), and to use for fetching and
50                committing offsets. Default: 'kafka-python-default-group'
51            enable_auto_commit (bool): If true the consumer's offset will be
52                periodically committed in the background. Default: True.
53            auto_commit_interval_ms (int): milliseconds between automatic
54                offset commits, if enable_auto_commit is True. Default: 5000.
55            default_offset_commit_callback (callable): called as
56                callback(offsets, exception) response will be either an Exception
57                or None. This callback can be used to trigger custom actions when
58                a commit request completes.
59            assignors (list): List of objects to use to distribute partition
60                ownership amongst consumer instances when group management is
61                used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor]
62            heartbeat_interval_ms (int): The expected time in milliseconds
63                between heartbeats to the consumer coordinator when using
64                Kafka's group management feature. Heartbeats are used to ensure
65                that the consumer's session stays active and to facilitate
66                rebalancing when new consumers join or leave the group. The
67                value must be set lower than session_timeout_ms, but typically
68                should be set no higher than 1/3 of that value. It can be
69                adjusted even lower to control the expected time for normal
70                rebalances. Default: 3000
71            session_timeout_ms (int): The timeout used to detect failures when
72                using Kafka's group managementment facilities. Default: 30000
73            retry_backoff_ms (int): Milliseconds to backoff when retrying on
74                errors. Default: 100.
75            exclude_internal_topics (bool): Whether records from internal topics
76                (such as offsets) should be exposed to the consumer. If set to
77                True the only way to receive records from an internal topic is
78                subscribing to it. Requires 0.10+. Default: True
79        """
80        super(ConsumerCoordinator, self).__init__(client, metrics, **configs)
81
82        self.config = copy.copy(self.DEFAULT_CONFIG)
83        for key in self.config:
84            if key in configs:
85                self.config[key] = configs[key]
86
87        self._subscription = subscription
88        self._is_leader = False
89        self._joined_subscription = set()
90        self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster)
91        self._assignment_snapshot = None
92        self._cluster = client.cluster
93        self.auto_commit_interval = self.config['auto_commit_interval_ms'] / 1000
94        self.next_auto_commit_deadline = None
95        self.completed_offset_commits = collections.deque()
96
97        if self.config['default_offset_commit_callback'] is None:
98            self.config['default_offset_commit_callback'] = self._default_offset_commit_callback
99
100        if self.config['group_id'] is not None:
101            if self.config['api_version'] >= (0, 9):
102                if not self.config['assignors']:
103                    raise Errors.KafkaConfigurationError('Coordinator requires assignors')
104            if self.config['api_version'] < (0, 10, 1):
105                if self.config['max_poll_interval_ms'] != self.config['session_timeout_ms']:
106                    raise Errors.KafkaConfigurationError("Broker version %s does not support "
107                                                         "different values for max_poll_interval_ms "
108                                                         "and session_timeout_ms")
109
110        if self.config['enable_auto_commit']:
111            if self.config['api_version'] < (0, 8, 1):
112                log.warning('Broker version (%s) does not support offset'
113                            ' commits; disabling auto-commit.',
114                            self.config['api_version'])
115                self.config['enable_auto_commit'] = False
116            elif self.config['group_id'] is None:
117                log.warning('group_id is None: disabling auto-commit.')
118                self.config['enable_auto_commit'] = False
119            else:
120                self.next_auto_commit_deadline = time.time() + self.auto_commit_interval
121
122        self.consumer_sensors = ConsumerCoordinatorMetrics(
123            metrics, self.config['metric_group_prefix'], self._subscription)
124
125        self._cluster.request_update()
126        self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
127
128    def __del__(self):
129        if hasattr(self, '_cluster') and self._cluster:
130            self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
131        super(ConsumerCoordinator, self).__del__()
132
133    def protocol_type(self):
134        return ConsumerProtocol.PROTOCOL_TYPE
135
136    def group_protocols(self):
137        """Returns list of preferred (protocols, metadata)"""
138        if self._subscription.subscription is None:
139            raise Errors.IllegalStateError('Consumer has not subscribed to topics')
140        # dpkp note: I really dislike this.
141        # why? because we are using this strange method group_protocols,
142        # which is seemingly innocuous, to set internal state (_joined_subscription)
143        # that is later used to check whether metadata has changed since we joined a group
144        # but there is no guarantee that this method, group_protocols, will get called
145        # in the correct sequence or that it will only be called when we want it to be.
146        # So this really should be moved elsewhere, but I don't have the energy to
147        # work that out right now. If you read this at some later date after the mutable
148        # state has bitten you... I'm sorry! It mimics the java client, and that's the
149        # best I've got for now.
150        self._joined_subscription = set(self._subscription.subscription)
151        metadata_list = []
152        for assignor in self.config['assignors']:
153            metadata = assignor.metadata(self._joined_subscription)
154            group_protocol = (assignor.name, metadata)
155            metadata_list.append(group_protocol)
156        return metadata_list
157
158    def _handle_metadata_update(self, cluster):
159        # if we encounter any unauthorized topics, raise an exception
160        if cluster.unauthorized_topics:
161            raise Errors.TopicAuthorizationFailedError(cluster.unauthorized_topics)
162
163        if self._subscription.subscribed_pattern:
164            topics = []
165            for topic in cluster.topics(self.config['exclude_internal_topics']):
166                if self._subscription.subscribed_pattern.match(topic):
167                    topics.append(topic)
168
169            if set(topics) != self._subscription.subscription:
170                self._subscription.change_subscription(topics)
171                self._client.set_topics(self._subscription.group_subscription())
172
173        # check if there are any changes to the metadata which should trigger
174        # a rebalance
175        if self._subscription.partitions_auto_assigned():
176            metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster)
177            if self._metadata_snapshot != metadata_snapshot:
178                self._metadata_snapshot = metadata_snapshot
179
180                # If we haven't got group coordinator support,
181                # just assign all partitions locally
182                if self._auto_assign_all_partitions():
183                    self._subscription.assign_from_subscribed([
184                        TopicPartition(topic, partition)
185                        for topic in self._subscription.subscription
186                        for partition in self._metadata_snapshot[topic]
187                    ])
188
189    def _auto_assign_all_partitions(self):
190        # For users that use "subscribe" without group support,
191        # we will simply assign all partitions to this consumer
192        if self.config['api_version'] < (0, 9):
193            return True
194        elif self.config['group_id'] is None:
195            return True
196        else:
197            return False
198
199    def _build_metadata_snapshot(self, subscription, cluster):
200        metadata_snapshot = {}
201        for topic in subscription.group_subscription():
202            partitions = cluster.partitions_for_topic(topic) or []
203            metadata_snapshot[topic] = set(partitions)
204        return metadata_snapshot
205
206    def _lookup_assignor(self, name):
207        for assignor in self.config['assignors']:
208            if assignor.name == name:
209                return assignor
210        return None
211
212    def _on_join_complete(self, generation, member_id, protocol,
213                          member_assignment_bytes):
214        # only the leader is responsible for monitoring for metadata changes
215        # (i.e. partition changes)
216        if not self._is_leader:
217            self._assignment_snapshot = None
218
219        assignor = self._lookup_assignor(protocol)
220        assert assignor, 'Coordinator selected invalid assignment protocol: %s' % (protocol,)
221
222        assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes)
223
224        # set the flag to refresh last committed offsets
225        self._subscription.needs_fetch_committed_offsets = True
226
227        # update partition assignment
228        self._subscription.assign_from_subscribed(assignment.partitions())
229
230        # give the assignor a chance to update internal state
231        # based on the received assignment
232        assignor.on_assignment(assignment)
233
234        # reschedule the auto commit starting from now
235        self.next_auto_commit_deadline = time.time() + self.auto_commit_interval
236
237        assigned = set(self._subscription.assigned_partitions())
238        log.info("Setting newly assigned partitions %s for group %s",
239                 assigned, self.group_id)
240
241        # execute the user's callback after rebalance
242        if self._subscription.listener:
243            try:
244                self._subscription.listener.on_partitions_assigned(assigned)
245            except Exception:
246                log.exception("User provided listener %s for group %s"
247                              " failed on partition assignment: %s",
248                              self._subscription.listener, self.group_id,
249                              assigned)
250
251    def poll(self):
252        """
253        Poll for coordinator events. Only applicable if group_id is set, and
254        broker version supports GroupCoordinators. This ensures that the
255        coordinator is known, and if using automatic partition assignment,
256        ensures that the consumer has joined the group. This also handles
257        periodic offset commits if they are enabled.
258        """
259        if self.group_id is None or self.config['api_version'] < (0, 8, 2):
260            return
261
262        self._invoke_completed_offset_commit_callbacks()
263        self.ensure_coordinator_ready()
264
265        if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned():
266            if self.need_rejoin():
267                # due to a race condition between the initial metadata fetch and the
268                # initial rebalance, we need to ensure that the metadata is fresh
269                # before joining initially, and then request the metadata update. If
270                # metadata update arrives while the rebalance is still pending (for
271                # example, when the join group is still inflight), then we will lose
272                # track of the fact that we need to rebalance again to reflect the
273                # change to the topic subscription. Without ensuring that the
274                # metadata is fresh, any metadata update that changes the topic
275                # subscriptions and arrives while a rebalance is in progress will
276                # essentially be ignored. See KAFKA-3949 for the complete
277                # description of the problem.
278                if self._subscription.subscribed_pattern:
279                    metadata_update = self._client.cluster.request_update()
280                    self._client.poll(future=metadata_update)
281
282                self.ensure_active_group()
283
284            self.poll_heartbeat()
285
286        self._maybe_auto_commit_offsets_async()
287
288    def time_to_next_poll(self):
289        """Return seconds (float) remaining until :meth:`.poll` should be called again"""
290        if not self.config['enable_auto_commit']:
291            return self.time_to_next_heartbeat()
292
293        if time.time() > self.next_auto_commit_deadline:
294            return 0
295
296        return min(self.next_auto_commit_deadline - time.time(),
297                   self.time_to_next_heartbeat())
298
299    def _perform_assignment(self, leader_id, assignment_strategy, members):
300        assignor = self._lookup_assignor(assignment_strategy)
301        assert assignor, 'Invalid assignment protocol: %s' % (assignment_strategy,)
302        member_metadata = {}
303        all_subscribed_topics = set()
304        for member_id, metadata_bytes in members:
305            metadata = ConsumerProtocol.METADATA.decode(metadata_bytes)
306            member_metadata[member_id] = metadata
307            all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member
308
309        # the leader will begin watching for changes to any of the topics
310        # the group is interested in, which ensures that all metadata changes
311        # will eventually be seen
312        # Because assignment typically happens within response callbacks,
313        # we cannot block on metadata updates here (no recursion into poll())
314        self._subscription.group_subscribe(all_subscribed_topics)
315        self._client.set_topics(self._subscription.group_subscription())
316
317        # keep track of the metadata used for assignment so that we can check
318        # after rebalance completion whether anything has changed
319        self._cluster.request_update()
320        self._is_leader = True
321        self._assignment_snapshot = self._metadata_snapshot
322
323        log.debug("Performing assignment for group %s using strategy %s"
324                  " with subscriptions %s", self.group_id, assignor.name,
325                  member_metadata)
326
327        assignments = assignor.assign(self._cluster, member_metadata)
328
329        log.debug("Finished assignment for group %s: %s", self.group_id, assignments)
330
331        group_assignment = {}
332        for member_id, assignment in six.iteritems(assignments):
333            group_assignment[member_id] = assignment
334        return group_assignment
335
336    def _on_join_prepare(self, generation, member_id):
337        # commit offsets prior to rebalance if auto-commit enabled
338        self._maybe_auto_commit_offsets_sync()
339
340        # execute the user's callback before rebalance
341        log.info("Revoking previously assigned partitions %s for group %s",
342                 self._subscription.assigned_partitions(), self.group_id)
343        if self._subscription.listener:
344            try:
345                revoked = set(self._subscription.assigned_partitions())
346                self._subscription.listener.on_partitions_revoked(revoked)
347            except Exception:
348                log.exception("User provided subscription listener %s"
349                              " for group %s failed on_partitions_revoked",
350                              self._subscription.listener, self.group_id)
351
352        self._is_leader = False
353        self._subscription.reset_group_subscription()
354
355    def need_rejoin(self):
356        """Check whether the group should be rejoined
357
358        Returns:
359            bool: True if consumer should rejoin group, False otherwise
360        """
361        if not self._subscription.partitions_auto_assigned():
362            return False
363
364        if self._auto_assign_all_partitions():
365            return False
366
367        # we need to rejoin if we performed the assignment and metadata has changed
368        if (self._assignment_snapshot is not None
369            and self._assignment_snapshot != self._metadata_snapshot):
370            return True
371
372        # we need to join if our subscription has changed since the last join
373        if (self._joined_subscription is not None
374            and self._joined_subscription != self._subscription.subscription):
375            return True
376
377        return super(ConsumerCoordinator, self).need_rejoin()
378
379    def refresh_committed_offsets_if_needed(self):
380        """Fetch committed offsets for assigned partitions."""
381        if self._subscription.needs_fetch_committed_offsets:
382            offsets = self.fetch_committed_offsets(self._subscription.assigned_partitions())
383            for partition, offset in six.iteritems(offsets):
384                # verify assignment is still active
385                if self._subscription.is_assigned(partition):
386                    self._subscription.assignment[partition].committed = offset.offset
387            self._subscription.needs_fetch_committed_offsets = False
388
389    def fetch_committed_offsets(self, partitions):
390        """Fetch the current committed offsets for specified partitions
391
392        Arguments:
393            partitions (list of TopicPartition): partitions to fetch
394
395        Returns:
396            dict: {TopicPartition: OffsetAndMetadata}
397        """
398        if not partitions:
399            return {}
400
401        while True:
402            self.ensure_coordinator_ready()
403
404            # contact coordinator to fetch committed offsets
405            future = self._send_offset_fetch_request(partitions)
406            self._client.poll(future=future)
407
408            if future.succeeded():
409                return future.value
410
411            if not future.retriable():
412                raise future.exception # pylint: disable-msg=raising-bad-type
413
414            time.sleep(self.config['retry_backoff_ms'] / 1000)
415
416    def close(self, autocommit=True):
417        """Close the coordinator, leave the current group,
418        and reset local generation / member_id.
419
420        Keyword Arguments:
421            autocommit (bool): If auto-commit is configured for this consumer,
422                this optional flag causes the consumer to attempt to commit any
423                pending consumed offsets prior to close. Default: True
424        """
425        try:
426            if autocommit:
427                self._maybe_auto_commit_offsets_sync()
428        finally:
429            super(ConsumerCoordinator, self).close()
430
431    def _invoke_completed_offset_commit_callbacks(self):
432        while self.completed_offset_commits:
433            callback, offsets, exception = self.completed_offset_commits.popleft()
434            callback(offsets, exception)
435
436    def commit_offsets_async(self, offsets, callback=None):
437        """Commit specific offsets asynchronously.
438
439        Arguments:
440            offsets (dict {TopicPartition: OffsetAndMetadata}): what to commit
441            callback (callable, optional): called as callback(offsets, response)
442                response will be either an Exception or a OffsetCommitResponse
443                struct. This callback can be used to trigger custom actions when
444                a commit request completes.
445
446        Returns:
447            kafka.future.Future
448        """
449        self._invoke_completed_offset_commit_callbacks()
450        if not self.coordinator_unknown():
451            future = self._do_commit_offsets_async(offsets, callback)
452        else:
453            # we don't know the current coordinator, so try to find it and then
454            # send the commit or fail (we don't want recursive retries which can
455            # cause offset commits to arrive out of order). Note that there may
456            # be multiple offset commits chained to the same coordinator lookup
457            # request. This is fine because the listeners will be invoked in the
458            # same order that they were added. Note also that BaseCoordinator
459            # prevents multiple concurrent coordinator lookup requests.
460            future = self.lookup_coordinator()
461            future.add_callback(lambda r: functools.partial(self._do_commit_offsets_async, offsets, callback)())
462            if callback:
463                future.add_errback(lambda e: self.completed_offset_commits.appendleft((callback, offsets, e)))
464
465        # ensure the commit has a chance to be transmitted (without blocking on
466        # its completion). Note that commits are treated as heartbeats by the
467        # coordinator, so there is no need to explicitly allow heartbeats
468        # through delayed task execution.
469        self._client.poll(timeout_ms=0) # no wakeup if we add that feature
470
471        return future
472
473    def _do_commit_offsets_async(self, offsets, callback=None):
474        assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
475        assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
476        assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
477                       offsets.values()))
478        if callback is None:
479            callback = self.config['default_offset_commit_callback']
480        self._subscription.needs_fetch_committed_offsets = True
481        future = self._send_offset_commit_request(offsets)
482        future.add_both(lambda res: self.completed_offset_commits.appendleft((callback, offsets, res)))
483        return future
484
485    def commit_offsets_sync(self, offsets):
486        """Commit specific offsets synchronously.
487
488        This method will retry until the commit completes successfully or an
489        unrecoverable error is encountered.
490
491        Arguments:
492            offsets (dict {TopicPartition: OffsetAndMetadata}): what to commit
493
494        Raises error on failure
495        """
496        assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
497        assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
498        assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
499                       offsets.values()))
500        self._invoke_completed_offset_commit_callbacks()
501        if not offsets:
502            return
503
504        while True:
505            self.ensure_coordinator_ready()
506
507            future = self._send_offset_commit_request(offsets)
508            self._client.poll(future=future)
509
510            if future.succeeded():
511                return future.value
512
513            if not future.retriable():
514                raise future.exception # pylint: disable-msg=raising-bad-type
515
516            time.sleep(self.config['retry_backoff_ms'] / 1000)
517
518    def _maybe_auto_commit_offsets_sync(self):
519        if self.config['enable_auto_commit']:
520            try:
521                self.commit_offsets_sync(self._subscription.all_consumed_offsets())
522
523            # The three main group membership errors are known and should not
524            # require a stacktrace -- just a warning
525            except (Errors.UnknownMemberIdError,
526                    Errors.IllegalGenerationError,
527                    Errors.RebalanceInProgressError):
528                log.warning("Offset commit failed: group membership out of date"
529                            " This is likely to cause duplicate message"
530                            " delivery.")
531            except Exception:
532                log.exception("Offset commit failed: This is likely to cause"
533                              " duplicate message delivery")
534
535    def _send_offset_commit_request(self, offsets):
536        """Commit offsets for the specified list of topics and partitions.
537
538        This is a non-blocking call which returns a request future that can be
539        polled in the case of a synchronous commit or ignored in the
540        asynchronous case.
541
542        Arguments:
543            offsets (dict of {TopicPartition: OffsetAndMetadata}): what should
544                be committed
545
546        Returns:
547            Future: indicating whether the commit was successful or not
548        """
549        assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
550        assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
551        assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
552                       offsets.values()))
553        if not offsets:
554            log.debug('No offsets to commit')
555            return Future().success(None)
556
557        node_id = self.coordinator()
558        if node_id is None:
559            return Future().failure(Errors.GroupCoordinatorNotAvailableError)
560
561
562        # create the offset commit request
563        offset_data = collections.defaultdict(dict)
564        for tp, offset in six.iteritems(offsets):
565            offset_data[tp.topic][tp.partition] = offset
566
567        if self._subscription.partitions_auto_assigned():
568            generation = self.generation()
569        else:
570            generation = Generation.NO_GENERATION
571
572        # if the generation is None, we are not part of an active group
573        # (and we expect to be). The only thing we can do is fail the commit
574        # and let the user rejoin the group in poll()
575        if self.config['api_version'] >= (0, 9) and generation is None:
576            return Future().failure(Errors.CommitFailedError())
577
578        if self.config['api_version'] >= (0, 9):
579            request = OffsetCommitRequest[2](
580                self.group_id,
581                generation.generation_id,
582                generation.member_id,
583                OffsetCommitRequest[2].DEFAULT_RETENTION_TIME,
584                [(
585                    topic, [(
586                        partition,
587                        offset.offset,
588                        offset.metadata
589                    ) for partition, offset in six.iteritems(partitions)]
590                ) for topic, partitions in six.iteritems(offset_data)]
591            )
592        elif self.config['api_version'] >= (0, 8, 2):
593            request = OffsetCommitRequest[1](
594                self.group_id, -1, '',
595                [(
596                    topic, [(
597                        partition,
598                        offset.offset,
599                        -1,
600                        offset.metadata
601                    ) for partition, offset in six.iteritems(partitions)]
602                ) for topic, partitions in six.iteritems(offset_data)]
603            )
604        elif self.config['api_version'] >= (0, 8, 1):
605            request = OffsetCommitRequest[0](
606                self.group_id,
607                [(
608                    topic, [(
609                        partition,
610                        offset.offset,
611                        offset.metadata
612                    ) for partition, offset in six.iteritems(partitions)]
613                ) for topic, partitions in six.iteritems(offset_data)]
614            )
615
616        log.debug("Sending offset-commit request with %s for group %s to %s",
617                  offsets, self.group_id, node_id)
618
619        future = Future()
620        _f = self._client.send(node_id, request)
621        _f.add_callback(self._handle_offset_commit_response, offsets, future, time.time())
622        _f.add_errback(self._failed_request, node_id, request, future)
623        return future
624
625    def _handle_offset_commit_response(self, offsets, future, send_time, response):
626        # TODO look at adding request_latency_ms to response (like java kafka)
627        self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000)
628        unauthorized_topics = set()
629
630        for topic, partitions in response.topics:
631            for partition, error_code in partitions:
632                tp = TopicPartition(topic, partition)
633                offset = offsets[tp]
634
635                error_type = Errors.for_code(error_code)
636                if error_type is Errors.NoError:
637                    log.debug("Group %s committed offset %s for partition %s",
638                              self.group_id, offset, tp)
639                    if self._subscription.is_assigned(tp):
640                        self._subscription.assignment[tp].committed = offset.offset
641                elif error_type is Errors.GroupAuthorizationFailedError:
642                    log.error("Not authorized to commit offsets for group %s",
643                              self.group_id)
644                    future.failure(error_type(self.group_id))
645                    return
646                elif error_type is Errors.TopicAuthorizationFailedError:
647                    unauthorized_topics.add(topic)
648                elif error_type in (Errors.OffsetMetadataTooLargeError,
649                                    Errors.InvalidCommitOffsetSizeError):
650                    # raise the error to the user
651                    log.debug("OffsetCommit for group %s failed on partition %s"
652                              " %s", self.group_id, tp, error_type.__name__)
653                    future.failure(error_type())
654                    return
655                elif error_type is Errors.GroupLoadInProgressError:
656                    # just retry
657                    log.debug("OffsetCommit for group %s failed: %s",
658                              self.group_id, error_type.__name__)
659                    future.failure(error_type(self.group_id))
660                    return
661                elif error_type in (Errors.GroupCoordinatorNotAvailableError,
662                                    Errors.NotCoordinatorForGroupError,
663                                    Errors.RequestTimedOutError):
664                    log.debug("OffsetCommit for group %s failed: %s",
665                              self.group_id, error_type.__name__)
666                    self.coordinator_dead(error_type())
667                    future.failure(error_type(self.group_id))
668                    return
669                elif error_type in (Errors.UnknownMemberIdError,
670                                    Errors.IllegalGenerationError,
671                                    Errors.RebalanceInProgressError):
672                    # need to re-join group
673                    error = error_type(self.group_id)
674                    log.debug("OffsetCommit for group %s failed: %s",
675                              self.group_id, error)
676                    self.reset_generation()
677                    future.failure(Errors.CommitFailedError())
678                    return
679                else:
680                    log.error("Group %s failed to commit partition %s at offset"
681                              " %s: %s", self.group_id, tp, offset,
682                              error_type.__name__)
683                    future.failure(error_type())
684                    return
685
686        if unauthorized_topics:
687            log.error("Not authorized to commit to topics %s for group %s",
688                      unauthorized_topics, self.group_id)
689            future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics))
690        else:
691            future.success(None)
692
693    def _send_offset_fetch_request(self, partitions):
694        """Fetch the committed offsets for a set of partitions.
695
696        This is a non-blocking call. The returned future can be polled to get
697        the actual offsets returned from the broker.
698
699        Arguments:
700            partitions (list of TopicPartition): the partitions to fetch
701
702        Returns:
703            Future: resolves to dict of offsets: {TopicPartition: int}
704        """
705        assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
706        assert all(map(lambda k: isinstance(k, TopicPartition), partitions))
707        if not partitions:
708            return Future().success({})
709
710        node_id = self.coordinator()
711        if node_id is None:
712            return Future().failure(Errors.GroupCoordinatorNotAvailableError)
713
714        # Verify node is ready
715        if not self._client.ready(node_id):
716            log.debug("Node %s not ready -- failing offset fetch request",
717                      node_id)
718            return Future().failure(Errors.NodeNotReadyError)
719
720        log.debug("Group %s fetching committed offsets for partitions: %s",
721                  self.group_id, partitions)
722        # construct the request
723        topic_partitions = collections.defaultdict(set)
724        for tp in partitions:
725            topic_partitions[tp.topic].add(tp.partition)
726
727        if self.config['api_version'] >= (0, 8, 2):
728            request = OffsetFetchRequest[1](
729                self.group_id,
730                list(topic_partitions.items())
731            )
732        else:
733            request = OffsetFetchRequest[0](
734                self.group_id,
735                list(topic_partitions.items())
736            )
737
738        # send the request with a callback
739        future = Future()
740        _f = self._client.send(node_id, request)
741        _f.add_callback(self._handle_offset_fetch_response, future)
742        _f.add_errback(self._failed_request, node_id, request, future)
743        return future
744
745    def _handle_offset_fetch_response(self, future, response):
746        offsets = {}
747        for topic, partitions in response.topics:
748            for partition, offset, metadata, error_code in partitions:
749                tp = TopicPartition(topic, partition)
750                error_type = Errors.for_code(error_code)
751                if error_type is not Errors.NoError:
752                    error = error_type()
753                    log.debug("Group %s failed to fetch offset for partition"
754                              " %s: %s", self.group_id, tp, error)
755                    if error_type is Errors.GroupLoadInProgressError:
756                        # just retry
757                        future.failure(error)
758                    elif error_type is Errors.NotCoordinatorForGroupError:
759                        # re-discover the coordinator and retry
760                        self.coordinator_dead(error_type())
761                        future.failure(error)
762                    elif error_type is Errors.UnknownTopicOrPartitionError:
763                        log.warning("OffsetFetchRequest -- unknown topic %s"
764                                    " (have you committed any offsets yet?)",
765                                    topic)
766                        continue
767                    else:
768                        log.error("Unknown error fetching offsets for %s: %s",
769                                  tp, error)
770                        future.failure(error)
771                    return
772                elif offset >= 0:
773                    # record the position with the offset
774                    # (-1 indicates no committed offset to fetch)
775                    offsets[tp] = OffsetAndMetadata(offset, metadata)
776                else:
777                    log.debug("Group %s has no committed offset for partition"
778                              " %s", self.group_id, tp)
779        future.success(offsets)
780
781    def _default_offset_commit_callback(self, offsets, exception):
782        if exception is not None:
783            log.error("Offset commit failed: %s", exception)
784
785    def _commit_offsets_async_on_complete(self, offsets, exception):
786        if exception is not None:
787            log.warning("Auto offset commit failed for group %s: %s",
788                        self.group_id, exception)
789            if getattr(exception, 'retriable', False):
790                self.next_auto_commit_deadline = min(time.time() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline)
791        else:
792            log.debug("Completed autocommit of offsets %s for group %s",
793                      offsets, self.group_id)
794
795    def _maybe_auto_commit_offsets_async(self):
796        if self.config['enable_auto_commit']:
797            if self.coordinator_unknown():
798                self.next_auto_commit_deadline = time.time() + self.config['retry_backoff_ms'] / 1000
799            elif time.time() > self.next_auto_commit_deadline:
800                self.next_auto_commit_deadline = time.time() + self.auto_commit_interval
801                self.commit_offsets_async(self._subscription.all_consumed_offsets(),
802                                          self._commit_offsets_async_on_complete)
803
804
805class ConsumerCoordinatorMetrics(object):
806    def __init__(self, metrics, metric_group_prefix, subscription):
807        self.metrics = metrics
808        self.metric_group_name = '%s-coordinator-metrics' % (metric_group_prefix,)
809
810        self.commit_latency = metrics.sensor('commit-latency')
811        self.commit_latency.add(metrics.metric_name(
812            'commit-latency-avg', self.metric_group_name,
813            'The average time taken for a commit request'), Avg())
814        self.commit_latency.add(metrics.metric_name(
815            'commit-latency-max', self.metric_group_name,
816            'The max time taken for a commit request'), Max())
817        self.commit_latency.add(metrics.metric_name(
818            'commit-rate', self.metric_group_name,
819            'The number of commit calls per second'), Rate(sampled_stat=Count()))
820
821        num_parts = AnonMeasurable(lambda config, now:
822                                   len(subscription.assigned_partitions()))
823        metrics.add_metric(metrics.metric_name(
824            'assigned-partitions', self.metric_group_name,
825            'The number of partitions currently assigned to this consumer'),
826            num_parts)
827