1from __future__ import absolute_import 2 3import atexit 4import logging 5import numbers 6from threading import Lock 7import warnings 8 9from kafka.errors import ( 10 UnknownTopicOrPartitionError, check_error, KafkaError) 11from kafka.structs import ( 12 OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload) 13from kafka.util import ReentrantTimer 14 15 16log = logging.getLogger('kafka.consumer') 17 18AUTO_COMMIT_MSG_COUNT = 100 19AUTO_COMMIT_INTERVAL = 5000 20 21FETCH_DEFAULT_BLOCK_TIMEOUT = 1 22FETCH_MAX_WAIT_TIME = 100 23FETCH_MIN_BYTES = 4096 24FETCH_BUFFER_SIZE_BYTES = 4096 25MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 26 27ITER_TIMEOUT_SECONDS = 60 28NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 29FULL_QUEUE_WAIT_TIME_SECONDS = 0.1 30 31MAX_BACKOFF_SECONDS = 60 32 33class Consumer(object): 34 """ 35 Base class to be used by other consumers. Not to be used directly 36 37 This base class provides logic for 38 39 * initialization and fetching metadata of partitions 40 * Auto-commit logic 41 * APIs for fetching pending message count 42 43 """ 44 def __init__(self, client, group, topic, partitions=None, auto_commit=True, 45 auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, 46 auto_commit_every_t=AUTO_COMMIT_INTERVAL): 47 48 warnings.warn('deprecated -- this class will be removed in a future' 49 ' release. Use KafkaConsumer instead.', 50 DeprecationWarning) 51 self.client = client 52 self.topic = topic 53 self.group = group 54 self.client.load_metadata_for_topics(topic, ignore_leadernotavailable=True) 55 self.offsets = {} 56 57 if partitions is None: 58 partitions = self.client.get_partition_ids_for_topic(topic) 59 else: 60 assert all(isinstance(x, numbers.Integral) for x in partitions) 61 62 # Variables for handling offset commits 63 self.commit_lock = Lock() 64 self.commit_timer = None 65 self.count_since_commit = 0 66 self.auto_commit = auto_commit 67 self.auto_commit_every_n = auto_commit_every_n 68 self.auto_commit_every_t = auto_commit_every_t 69 70 # Set up the auto-commit timer 71 if auto_commit is True and auto_commit_every_t is not None: 72 self.commit_timer = ReentrantTimer(auto_commit_every_t, 73 self.commit) 74 self.commit_timer.start() 75 76 # Set initial offsets 77 if self.group is not None: 78 self.fetch_last_known_offsets(partitions) 79 else: 80 for partition in partitions: 81 self.offsets[partition] = 0 82 83 # Register a cleanup handler 84 def cleanup(obj): 85 obj.stop() 86 self._cleanup_func = cleanup 87 atexit.register(cleanup, self) 88 89 self.partition_info = False # Do not return partition info in msgs 90 91 def provide_partition_info(self): 92 """ 93 Indicates that partition info must be returned by the consumer 94 """ 95 self.partition_info = True 96 97 def fetch_last_known_offsets(self, partitions=None): 98 if self.group is None: 99 raise ValueError('SimpleClient.group must not be None') 100 101 if partitions is None: 102 partitions = self.client.get_partition_ids_for_topic(self.topic) 103 104 responses = self.client.send_offset_fetch_request( 105 self.group, 106 [OffsetFetchRequestPayload(self.topic, p) for p in partitions], 107 fail_on_error=False 108 ) 109 110 for resp in responses: 111 try: 112 check_error(resp) 113 # API spec says server won't set an error here 114 # but 0.8.1.1 does actually... 115 except UnknownTopicOrPartitionError: 116 pass 117 118 # -1 offset signals no commit is currently stored 119 if resp.offset == -1: 120 self.offsets[resp.partition] = 0 121 122 # Otherwise we committed the stored offset 123 # and need to fetch the next one 124 else: 125 self.offsets[resp.partition] = resp.offset 126 127 def commit(self, partitions=None): 128 """Commit stored offsets to Kafka via OffsetCommitRequest (v0) 129 130 Keyword Arguments: 131 partitions (list): list of partitions to commit, default is to commit 132 all of them 133 134 Returns: True on success, False on failure 135 """ 136 137 # short circuit if nothing happened. This check is kept outside 138 # to prevent un-necessarily acquiring a lock for checking the state 139 if self.count_since_commit == 0: 140 return 141 142 with self.commit_lock: 143 # Do this check again, just in case the state has changed 144 # during the lock acquiring timeout 145 if self.count_since_commit == 0: 146 return 147 148 reqs = [] 149 if partitions is None: # commit all partitions 150 partitions = list(self.offsets.keys()) 151 152 log.debug('Committing new offsets for %s, partitions %s', 153 self.topic, partitions) 154 for partition in partitions: 155 offset = self.offsets[partition] 156 log.debug('Commit offset %d in SimpleConsumer: ' 157 'group=%s, topic=%s, partition=%s', 158 offset, self.group, self.topic, partition) 159 160 reqs.append(OffsetCommitRequestPayload(self.topic, partition, 161 offset, None)) 162 163 try: 164 self.client.send_offset_commit_request(self.group, reqs) 165 except KafkaError as e: 166 log.error('%s saving offsets: %s', e.__class__.__name__, e) 167 return False 168 else: 169 self.count_since_commit = 0 170 return True 171 172 def _auto_commit(self): 173 """ 174 Check if we have to commit based on number of messages and commit 175 """ 176 177 # Check if we are supposed to do an auto-commit 178 if not self.auto_commit or self.auto_commit_every_n is None: 179 return 180 181 if self.count_since_commit >= self.auto_commit_every_n: 182 self.commit() 183 184 def stop(self): 185 if self.commit_timer is not None: 186 self.commit_timer.stop() 187 self.commit() 188 189 if hasattr(self, '_cleanup_func'): 190 # Remove cleanup handler now that we've stopped 191 192 # py3 supports unregistering 193 if hasattr(atexit, 'unregister'): 194 atexit.unregister(self._cleanup_func) # pylint: disable=no-member 195 196 # py2 requires removing from private attribute... 197 else: 198 199 # ValueError on list.remove() if the exithandler no longer 200 # exists is fine here 201 try: 202 atexit._exithandlers.remove( # pylint: disable=no-member 203 (self._cleanup_func, (self,), {})) 204 except ValueError: 205 pass 206 207 del self._cleanup_func 208 209 def pending(self, partitions=None): 210 """ 211 Gets the pending message count 212 213 Keyword Arguments: 214 partitions (list): list of partitions to check for, default is to check all 215 """ 216 if partitions is None: 217 partitions = self.offsets.keys() 218 219 total = 0 220 reqs = [] 221 222 for partition in partitions: 223 reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1)) 224 225 resps = self.client.send_offset_request(reqs) 226 for resp in resps: 227 partition = resp.partition 228 pending = resp.offsets[0] 229 offset = self.offsets[partition] 230 total += pending - offset 231 232 return total 233