1from __future__ import absolute_import
2
3import atexit
4import logging
5import numbers
6from threading import Lock
7import warnings
8
9from kafka.errors import (
10    UnknownTopicOrPartitionError, check_error, KafkaError)
11from kafka.structs import (
12    OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload)
13from kafka.util import ReentrantTimer
14
15
16log = logging.getLogger('kafka.consumer')
17
18AUTO_COMMIT_MSG_COUNT = 100
19AUTO_COMMIT_INTERVAL = 5000
20
21FETCH_DEFAULT_BLOCK_TIMEOUT = 1
22FETCH_MAX_WAIT_TIME = 100
23FETCH_MIN_BYTES = 4096
24FETCH_BUFFER_SIZE_BYTES = 4096
25MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
26
27ITER_TIMEOUT_SECONDS = 60
28NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
29FULL_QUEUE_WAIT_TIME_SECONDS = 0.1
30
31MAX_BACKOFF_SECONDS = 60
32
33class Consumer(object):
34    """
35    Base class to be used by other consumers. Not to be used directly
36
37    This base class provides logic for
38
39    * initialization and fetching metadata of partitions
40    * Auto-commit logic
41    * APIs for fetching pending message count
42
43    """
44    def __init__(self, client, group, topic, partitions=None, auto_commit=True,
45                 auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
46                 auto_commit_every_t=AUTO_COMMIT_INTERVAL):
47
48        warnings.warn('deprecated -- this class will be removed in a future'
49                      ' release. Use KafkaConsumer instead.',
50                      DeprecationWarning)
51        self.client = client
52        self.topic = topic
53        self.group = group
54        self.client.load_metadata_for_topics(topic, ignore_leadernotavailable=True)
55        self.offsets = {}
56
57        if partitions is None:
58            partitions = self.client.get_partition_ids_for_topic(topic)
59        else:
60            assert all(isinstance(x, numbers.Integral) for x in partitions)
61
62        # Variables for handling offset commits
63        self.commit_lock = Lock()
64        self.commit_timer = None
65        self.count_since_commit = 0
66        self.auto_commit = auto_commit
67        self.auto_commit_every_n = auto_commit_every_n
68        self.auto_commit_every_t = auto_commit_every_t
69
70        # Set up the auto-commit timer
71        if auto_commit is True and auto_commit_every_t is not None:
72            self.commit_timer = ReentrantTimer(auto_commit_every_t,
73                                               self.commit)
74            self.commit_timer.start()
75
76        # Set initial offsets
77        if self.group is not None:
78            self.fetch_last_known_offsets(partitions)
79        else:
80            for partition in partitions:
81                self.offsets[partition] = 0
82
83        # Register a cleanup handler
84        def cleanup(obj):
85            obj.stop()
86        self._cleanup_func = cleanup
87        atexit.register(cleanup, self)
88
89        self.partition_info = False     # Do not return partition info in msgs
90
91    def provide_partition_info(self):
92        """
93        Indicates that partition info must be returned by the consumer
94        """
95        self.partition_info = True
96
97    def fetch_last_known_offsets(self, partitions=None):
98        if self.group is None:
99            raise ValueError('SimpleClient.group must not be None')
100
101        if partitions is None:
102            partitions = self.client.get_partition_ids_for_topic(self.topic)
103
104        responses = self.client.send_offset_fetch_request(
105            self.group,
106            [OffsetFetchRequestPayload(self.topic, p) for p in partitions],
107            fail_on_error=False
108        )
109
110        for resp in responses:
111            try:
112                check_error(resp)
113            # API spec says server won't set an error here
114            # but 0.8.1.1 does actually...
115            except UnknownTopicOrPartitionError:
116                pass
117
118            # -1 offset signals no commit is currently stored
119            if resp.offset == -1:
120                self.offsets[resp.partition] = 0
121
122            # Otherwise we committed the stored offset
123            # and need to fetch the next one
124            else:
125                self.offsets[resp.partition] = resp.offset
126
127    def commit(self, partitions=None):
128        """Commit stored offsets to Kafka via OffsetCommitRequest (v0)
129
130        Keyword Arguments:
131            partitions (list): list of partitions to commit, default is to commit
132                all of them
133
134        Returns: True on success, False on failure
135        """
136
137        # short circuit if nothing happened. This check is kept outside
138        # to prevent un-necessarily acquiring a lock for checking the state
139        if self.count_since_commit == 0:
140            return
141
142        with self.commit_lock:
143            # Do this check again, just in case the state has changed
144            # during the lock acquiring timeout
145            if self.count_since_commit == 0:
146                return
147
148            reqs = []
149            if partitions is None:  # commit all partitions
150                partitions = list(self.offsets.keys())
151
152            log.debug('Committing new offsets for %s, partitions %s',
153                     self.topic, partitions)
154            for partition in partitions:
155                offset = self.offsets[partition]
156                log.debug('Commit offset %d in SimpleConsumer: '
157                          'group=%s, topic=%s, partition=%s',
158                          offset, self.group, self.topic, partition)
159
160                reqs.append(OffsetCommitRequestPayload(self.topic, partition,
161                                                offset, None))
162
163            try:
164                self.client.send_offset_commit_request(self.group, reqs)
165            except KafkaError as e:
166                log.error('%s saving offsets: %s', e.__class__.__name__, e)
167                return False
168            else:
169                self.count_since_commit = 0
170                return True
171
172    def _auto_commit(self):
173        """
174        Check if we have to commit based on number of messages and commit
175        """
176
177        # Check if we are supposed to do an auto-commit
178        if not self.auto_commit or self.auto_commit_every_n is None:
179            return
180
181        if self.count_since_commit >= self.auto_commit_every_n:
182            self.commit()
183
184    def stop(self):
185        if self.commit_timer is not None:
186            self.commit_timer.stop()
187            self.commit()
188
189        if hasattr(self, '_cleanup_func'):
190            # Remove cleanup handler now that we've stopped
191
192            # py3 supports unregistering
193            if hasattr(atexit, 'unregister'):
194                atexit.unregister(self._cleanup_func) # pylint: disable=no-member
195
196            # py2 requires removing from private attribute...
197            else:
198
199                # ValueError on list.remove() if the exithandler no longer
200                # exists is fine here
201                try:
202                    atexit._exithandlers.remove(  # pylint: disable=no-member
203                        (self._cleanup_func, (self,), {}))
204                except ValueError:
205                    pass
206
207            del self._cleanup_func
208
209    def pending(self, partitions=None):
210        """
211        Gets the pending message count
212
213        Keyword Arguments:
214            partitions (list): list of partitions to check for, default is to check all
215        """
216        if partitions is None:
217            partitions = self.offsets.keys()
218
219        total = 0
220        reqs = []
221
222        for partition in partitions:
223            reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1))
224
225        resps = self.client.send_offset_request(reqs)
226        for resp in resps:
227            partition = resp.partition
228            pending = resp.offsets[0]
229            offset = self.offsets[partition]
230            total += pending - offset
231
232        return total
233