1from __future__ import absolute_import 2 3import collections 4import copy 5import logging 6import threading 7import time 8 9from kafka.vendor import six 10 11from kafka import errors as Errors 12from kafka.conn import collect_hosts, dns_lookup 13from kafka.future import Future 14from kafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition 15 16log = logging.getLogger(__name__) 17 18 19class ClusterMetadata(object): 20 """ 21 A class to manage kafka cluster metadata. 22 23 This class does not perform any IO. It simply updates internal state 24 given API responses (MetadataResponse, GroupCoordinatorResponse). 25 26 Keyword Arguments: 27 retry_backoff_ms (int): Milliseconds to backoff when retrying on 28 errors. Default: 100. 29 metadata_max_age_ms (int): The period of time in milliseconds after 30 which we force a refresh of metadata even if we haven't seen any 31 partition leadership changes to proactively discover any new 32 brokers or partitions. Default: 300000 33 bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' 34 strings) that the client should contact to bootstrap initial 35 cluster metadata. This does not have to be the full node list. 36 It just needs to have at least one broker that will respond to a 37 Metadata API Request. Default port is 9092. If no servers are 38 specified, will default to localhost:9092. 39 """ 40 DEFAULT_CONFIG = { 41 'retry_backoff_ms': 100, 42 'metadata_max_age_ms': 300000, 43 'bootstrap_servers': 'localhost', 44 } 45 46 def __init__(self, **configs): 47 self._brokers = {} # node_id -> BrokerMetadata 48 self._partitions = {} # topic -> partition -> PartitionMetadata 49 self._broker_partitions = collections.defaultdict(set) # node_id -> {TopicPartition...} 50 self._groups = {} # group_name -> node_id 51 self._last_refresh_ms = 0 52 self._last_successful_refresh_ms = 0 53 self._need_update = True 54 self._future = None 55 self._listeners = set() 56 self._lock = threading.Lock() 57 self.need_all_topic_metadata = False 58 self.unauthorized_topics = set() 59 self.internal_topics = set() 60 self.controller = None 61 62 self.config = copy.copy(self.DEFAULT_CONFIG) 63 for key in self.config: 64 if key in configs: 65 self.config[key] = configs[key] 66 67 self._bootstrap_brokers = self._generate_bootstrap_brokers() 68 69 def _generate_bootstrap_brokers(self): 70 # collect_hosts does not perform DNS, so we should be fine to re-use 71 bootstrap_hosts = collect_hosts(self.config['bootstrap_servers']) 72 73 while True: 74 for host, port, afi in bootstrap_hosts: 75 for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi): 76 yield BrokerMetadata('bootstrap', sockaddr[0], sockaddr[1], None) 77 78 def brokers(self): 79 """Get all BrokerMetadata 80 81 Returns: 82 set: {BrokerMetadata, ...} 83 """ 84 return set(self._brokers.values()) 85 86 def broker_metadata(self, broker_id): 87 """Get BrokerMetadata 88 89 Arguments: 90 broker_id (int): node_id for a broker to check 91 92 Returns: 93 BrokerMetadata or None if not found 94 """ 95 if broker_id == 'bootstrap': 96 return next(self._bootstrap_brokers) 97 98 return self._brokers.get(broker_id) 99 100 def partitions_for_topic(self, topic): 101 """Return set of all partitions for topic (whether available or not) 102 103 Arguments: 104 topic (str): topic to check for partitions 105 106 Returns: 107 set: {partition (int), ...} 108 """ 109 if topic not in self._partitions: 110 return None 111 return set(self._partitions[topic].keys()) 112 113 def available_partitions_for_topic(self, topic): 114 """Return set of partitions with known leaders 115 116 Arguments: 117 topic (str): topic to check for partitions 118 119 Returns: 120 set: {partition (int), ...} 121 None if topic not found. 122 """ 123 if topic not in self._partitions: 124 return None 125 return set([partition for partition, metadata 126 in six.iteritems(self._partitions[topic]) 127 if metadata.leader != -1]) 128 129 def leader_for_partition(self, partition): 130 """Return node_id of leader, -1 unavailable, None if unknown.""" 131 if partition.topic not in self._partitions: 132 return None 133 elif partition.partition not in self._partitions[partition.topic]: 134 return None 135 return self._partitions[partition.topic][partition.partition].leader 136 137 def partitions_for_broker(self, broker_id): 138 """Return TopicPartitions for which the broker is a leader. 139 140 Arguments: 141 broker_id (int): node id for a broker 142 143 Returns: 144 set: {TopicPartition, ...} 145 None if the broker either has no partitions or does not exist. 146 """ 147 return self._broker_partitions.get(broker_id) 148 149 def coordinator_for_group(self, group): 150 """Return node_id of group coordinator. 151 152 Arguments: 153 group (str): name of consumer group 154 155 Returns: 156 int: node_id for group coordinator 157 None if the group does not exist. 158 """ 159 return self._groups.get(group) 160 161 def ttl(self): 162 """Milliseconds until metadata should be refreshed""" 163 now = time.time() * 1000 164 if self._need_update: 165 ttl = 0 166 else: 167 metadata_age = now - self._last_successful_refresh_ms 168 ttl = self.config['metadata_max_age_ms'] - metadata_age 169 170 retry_age = now - self._last_refresh_ms 171 next_retry = self.config['retry_backoff_ms'] - retry_age 172 173 return max(ttl, next_retry, 0) 174 175 def refresh_backoff(self): 176 """Return milliseconds to wait before attempting to retry after failure""" 177 return self.config['retry_backoff_ms'] 178 179 def request_update(self): 180 """Flags metadata for update, return Future() 181 182 Actual update must be handled separately. This method will only 183 change the reported ttl() 184 185 Returns: 186 kafka.future.Future (value will be the cluster object after update) 187 """ 188 with self._lock: 189 self._need_update = True 190 if not self._future or self._future.is_done: 191 self._future = Future() 192 return self._future 193 194 def topics(self, exclude_internal_topics=True): 195 """Get set of known topics. 196 197 Arguments: 198 exclude_internal_topics (bool): Whether records from internal topics 199 (such as offsets) should be exposed to the consumer. If set to 200 True the only way to receive records from an internal topic is 201 subscribing to it. Default True 202 203 Returns: 204 set: {topic (str), ...} 205 """ 206 topics = set(self._partitions.keys()) 207 if exclude_internal_topics: 208 return topics - self.internal_topics 209 else: 210 return topics 211 212 def failed_update(self, exception): 213 """Update cluster state given a failed MetadataRequest.""" 214 f = None 215 with self._lock: 216 if self._future: 217 f = self._future 218 self._future = None 219 if f: 220 f.failure(exception) 221 self._last_refresh_ms = time.time() * 1000 222 223 def update_metadata(self, metadata): 224 """Update cluster state given a MetadataResponse. 225 226 Arguments: 227 metadata (MetadataResponse): broker response to a metadata request 228 229 Returns: None 230 """ 231 # In the common case where we ask for a single topic and get back an 232 # error, we should fail the future 233 if len(metadata.topics) == 1 and metadata.topics[0][0] != 0: 234 error_code, topic = metadata.topics[0][:2] 235 error = Errors.for_code(error_code)(topic) 236 return self.failed_update(error) 237 238 if not metadata.brokers: 239 log.warning("No broker metadata found in MetadataResponse -- ignoring.") 240 return self.failed_update(Errors.MetadataEmptyBrokerList(metadata)) 241 242 _new_brokers = {} 243 for broker in metadata.brokers: 244 if metadata.API_VERSION == 0: 245 node_id, host, port = broker 246 rack = None 247 else: 248 node_id, host, port, rack = broker 249 _new_brokers.update({ 250 node_id: BrokerMetadata(node_id, host, port, rack) 251 }) 252 253 if metadata.API_VERSION == 0: 254 _new_controller = None 255 else: 256 _new_controller = _new_brokers.get(metadata.controller_id) 257 258 _new_partitions = {} 259 _new_broker_partitions = collections.defaultdict(set) 260 _new_unauthorized_topics = set() 261 _new_internal_topics = set() 262 263 for topic_data in metadata.topics: 264 if metadata.API_VERSION == 0: 265 error_code, topic, partitions = topic_data 266 is_internal = False 267 else: 268 error_code, topic, is_internal, partitions = topic_data 269 if is_internal: 270 _new_internal_topics.add(topic) 271 error_type = Errors.for_code(error_code) 272 if error_type is Errors.NoError: 273 _new_partitions[topic] = {} 274 for p_error, partition, leader, replicas, isr in partitions: 275 _new_partitions[topic][partition] = PartitionMetadata( 276 topic=topic, partition=partition, leader=leader, 277 replicas=replicas, isr=isr, error=p_error) 278 if leader != -1: 279 _new_broker_partitions[leader].add( 280 TopicPartition(topic, partition)) 281 282 elif error_type is Errors.LeaderNotAvailableError: 283 log.warning("Topic %s is not available during auto-create" 284 " initialization", topic) 285 elif error_type is Errors.UnknownTopicOrPartitionError: 286 log.error("Topic %s not found in cluster metadata", topic) 287 elif error_type is Errors.TopicAuthorizationFailedError: 288 log.error("Topic %s is not authorized for this client", topic) 289 _new_unauthorized_topics.add(topic) 290 elif error_type is Errors.InvalidTopicError: 291 log.error("'%s' is not a valid topic name", topic) 292 else: 293 log.error("Error fetching metadata for topic %s: %s", 294 topic, error_type) 295 296 with self._lock: 297 self._brokers = _new_brokers 298 self.controller = _new_controller 299 self._partitions = _new_partitions 300 self._broker_partitions = _new_broker_partitions 301 self.unauthorized_topics = _new_unauthorized_topics 302 self.internal_topics = _new_internal_topics 303 f = None 304 if self._future: 305 f = self._future 306 self._future = None 307 self._need_update = False 308 309 now = time.time() * 1000 310 self._last_refresh_ms = now 311 self._last_successful_refresh_ms = now 312 313 if f: 314 f.success(self) 315 log.debug("Updated cluster metadata to %s", self) 316 317 for listener in self._listeners: 318 listener(self) 319 320 if self.need_all_topic_metadata: 321 # the listener may change the interested topics, 322 # which could cause another metadata refresh. 323 # If we have already fetched all topics, however, 324 # another fetch should be unnecessary. 325 self._need_update = False 326 327 def add_listener(self, listener): 328 """Add a callback function to be called on each metadata update""" 329 self._listeners.add(listener) 330 331 def remove_listener(self, listener): 332 """Remove a previously added listener callback""" 333 self._listeners.remove(listener) 334 335 def add_group_coordinator(self, group, response): 336 """Update with metadata for a group coordinator 337 338 Arguments: 339 group (str): name of group from GroupCoordinatorRequest 340 response (GroupCoordinatorResponse): broker response 341 342 Returns: 343 bool: True if metadata is updated, False on error 344 """ 345 log.debug("Updating coordinator for %s: %s", group, response) 346 error_type = Errors.for_code(response.error_code) 347 if error_type is not Errors.NoError: 348 log.error("GroupCoordinatorResponse error: %s", error_type) 349 self._groups[group] = -1 350 return False 351 352 node_id = response.coordinator_id 353 coordinator = BrokerMetadata( 354 response.coordinator_id, 355 response.host, 356 response.port, 357 None) 358 359 # Assume that group coordinators are just brokers 360 # (this is true now, but could diverge in future) 361 if node_id not in self._brokers: 362 self._brokers[node_id] = coordinator 363 364 # If this happens, either brokers have moved without 365 # changing IDs, or our assumption above is wrong 366 else: 367 node = self._brokers[node_id] 368 if coordinator.host != node.host or coordinator.port != node.port: 369 log.error("GroupCoordinator metadata conflicts with existing" 370 " broker metadata. Coordinator: %s, Broker: %s", 371 coordinator, node) 372 self._groups[group] = node_id 373 return False 374 375 log.info("Group coordinator for %s is %s", group, coordinator) 376 self._groups[group] = node_id 377 return True 378 379 def with_partitions(self, partitions_to_add): 380 """Returns a copy of cluster metadata with partitions added""" 381 new_metadata = ClusterMetadata(**self.config) 382 new_metadata._brokers = copy.deepcopy(self._brokers) 383 new_metadata._partitions = copy.deepcopy(self._partitions) 384 new_metadata._broker_partitions = copy.deepcopy(self._broker_partitions) 385 new_metadata._groups = copy.deepcopy(self._groups) 386 new_metadata.internal_topics = copy.deepcopy(self.internal_topics) 387 new_metadata.unauthorized_topics = copy.deepcopy(self.unauthorized_topics) 388 389 for partition in partitions_to_add: 390 new_metadata._partitions[partition.topic][partition.partition] = partition 391 392 if partition.leader is not None and partition.leader != -1: 393 new_metadata._broker_partitions[partition.leader].add( 394 TopicPartition(partition.topic, partition.partition)) 395 396 return new_metadata 397 398 def __str__(self): 399 return 'ClusterMetadata(brokers: %d, topics: %d, groups: %d)' % \ 400 (len(self._brokers), len(self._partitions), len(self._groups)) 401