1from __future__ import absolute_import
2
3import collections
4import copy
5import logging
6import threading
7import time
8
9from kafka.vendor import six
10
11from kafka import errors as Errors
12from kafka.conn import collect_hosts, dns_lookup
13from kafka.future import Future
14from kafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition
15
16log = logging.getLogger(__name__)
17
18
19class ClusterMetadata(object):
20    """
21    A class to manage kafka cluster metadata.
22
23    This class does not perform any IO. It simply updates internal state
24    given API responses (MetadataResponse, GroupCoordinatorResponse).
25
26    Keyword Arguments:
27        retry_backoff_ms (int): Milliseconds to backoff when retrying on
28            errors. Default: 100.
29        metadata_max_age_ms (int): The period of time in milliseconds after
30            which we force a refresh of metadata even if we haven't seen any
31            partition leadership changes to proactively discover any new
32            brokers or partitions. Default: 300000
33        bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
34            strings) that the client should contact to bootstrap initial
35            cluster metadata. This does not have to be the full node list.
36            It just needs to have at least one broker that will respond to a
37            Metadata API Request. Default port is 9092. If no servers are
38            specified, will default to localhost:9092.
39    """
40    DEFAULT_CONFIG = {
41        'retry_backoff_ms': 100,
42        'metadata_max_age_ms': 300000,
43        'bootstrap_servers': 'localhost',
44    }
45
46    def __init__(self, **configs):
47        self._brokers = {}  # node_id -> BrokerMetadata
48        self._partitions = {}  # topic -> partition -> PartitionMetadata
49        self._broker_partitions = collections.defaultdict(set)  # node_id -> {TopicPartition...}
50        self._groups = {}  # group_name -> node_id
51        self._last_refresh_ms = 0
52        self._last_successful_refresh_ms = 0
53        self._need_update = True
54        self._future = None
55        self._listeners = set()
56        self._lock = threading.Lock()
57        self.need_all_topic_metadata = False
58        self.unauthorized_topics = set()
59        self.internal_topics = set()
60        self.controller = None
61
62        self.config = copy.copy(self.DEFAULT_CONFIG)
63        for key in self.config:
64            if key in configs:
65                self.config[key] = configs[key]
66
67        self._bootstrap_brokers = self._generate_bootstrap_brokers()
68
69    def _generate_bootstrap_brokers(self):
70        # collect_hosts does not perform DNS, so we should be fine to re-use
71        bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])
72
73        while True:
74            for host, port, afi in bootstrap_hosts:
75                for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi):
76                    yield BrokerMetadata('bootstrap', sockaddr[0], sockaddr[1], None)
77
78    def brokers(self):
79        """Get all BrokerMetadata
80
81        Returns:
82            set: {BrokerMetadata, ...}
83        """
84        return set(self._brokers.values())
85
86    def broker_metadata(self, broker_id):
87        """Get BrokerMetadata
88
89        Arguments:
90            broker_id (int): node_id for a broker to check
91
92        Returns:
93            BrokerMetadata or None if not found
94        """
95        if broker_id == 'bootstrap':
96            return next(self._bootstrap_brokers)
97
98        return self._brokers.get(broker_id)
99
100    def partitions_for_topic(self, topic):
101        """Return set of all partitions for topic (whether available or not)
102
103        Arguments:
104            topic (str): topic to check for partitions
105
106        Returns:
107            set: {partition (int), ...}
108        """
109        if topic not in self._partitions:
110            return None
111        return set(self._partitions[topic].keys())
112
113    def available_partitions_for_topic(self, topic):
114        """Return set of partitions with known leaders
115
116        Arguments:
117            topic (str): topic to check for partitions
118
119        Returns:
120            set: {partition (int), ...}
121            None if topic not found.
122        """
123        if topic not in self._partitions:
124            return None
125        return set([partition for partition, metadata
126                              in six.iteritems(self._partitions[topic])
127                              if metadata.leader != -1])
128
129    def leader_for_partition(self, partition):
130        """Return node_id of leader, -1 unavailable, None if unknown."""
131        if partition.topic not in self._partitions:
132            return None
133        elif partition.partition not in self._partitions[partition.topic]:
134            return None
135        return self._partitions[partition.topic][partition.partition].leader
136
137    def partitions_for_broker(self, broker_id):
138        """Return TopicPartitions for which the broker is a leader.
139
140        Arguments:
141            broker_id (int): node id for a broker
142
143        Returns:
144            set: {TopicPartition, ...}
145            None if the broker either has no partitions or does not exist.
146        """
147        return self._broker_partitions.get(broker_id)
148
149    def coordinator_for_group(self, group):
150        """Return node_id of group coordinator.
151
152        Arguments:
153            group (str): name of consumer group
154
155        Returns:
156            int: node_id for group coordinator
157            None if the group does not exist.
158        """
159        return self._groups.get(group)
160
161    def ttl(self):
162        """Milliseconds until metadata should be refreshed"""
163        now = time.time() * 1000
164        if self._need_update:
165            ttl = 0
166        else:
167            metadata_age = now - self._last_successful_refresh_ms
168            ttl = self.config['metadata_max_age_ms'] - metadata_age
169
170        retry_age = now - self._last_refresh_ms
171        next_retry = self.config['retry_backoff_ms'] - retry_age
172
173        return max(ttl, next_retry, 0)
174
175    def refresh_backoff(self):
176        """Return milliseconds to wait before attempting to retry after failure"""
177        return self.config['retry_backoff_ms']
178
179    def request_update(self):
180        """Flags metadata for update, return Future()
181
182        Actual update must be handled separately. This method will only
183        change the reported ttl()
184
185        Returns:
186            kafka.future.Future (value will be the cluster object after update)
187        """
188        with self._lock:
189            self._need_update = True
190            if not self._future or self._future.is_done:
191              self._future = Future()
192            return self._future
193
194    def topics(self, exclude_internal_topics=True):
195        """Get set of known topics.
196
197        Arguments:
198            exclude_internal_topics (bool): Whether records from internal topics
199                (such as offsets) should be exposed to the consumer. If set to
200                True the only way to receive records from an internal topic is
201                subscribing to it. Default True
202
203        Returns:
204            set: {topic (str), ...}
205        """
206        topics = set(self._partitions.keys())
207        if exclude_internal_topics:
208            return topics - self.internal_topics
209        else:
210            return topics
211
212    def failed_update(self, exception):
213        """Update cluster state given a failed MetadataRequest."""
214        f = None
215        with self._lock:
216            if self._future:
217                f = self._future
218                self._future = None
219        if f:
220            f.failure(exception)
221        self._last_refresh_ms = time.time() * 1000
222
223    def update_metadata(self, metadata):
224        """Update cluster state given a MetadataResponse.
225
226        Arguments:
227            metadata (MetadataResponse): broker response to a metadata request
228
229        Returns: None
230        """
231        # In the common case where we ask for a single topic and get back an
232        # error, we should fail the future
233        if len(metadata.topics) == 1 and metadata.topics[0][0] != 0:
234            error_code, topic = metadata.topics[0][:2]
235            error = Errors.for_code(error_code)(topic)
236            return self.failed_update(error)
237
238        if not metadata.brokers:
239            log.warning("No broker metadata found in MetadataResponse -- ignoring.")
240            return self.failed_update(Errors.MetadataEmptyBrokerList(metadata))
241
242        _new_brokers = {}
243        for broker in metadata.brokers:
244            if metadata.API_VERSION == 0:
245                node_id, host, port = broker
246                rack = None
247            else:
248                node_id, host, port, rack = broker
249            _new_brokers.update({
250                node_id: BrokerMetadata(node_id, host, port, rack)
251            })
252
253        if metadata.API_VERSION == 0:
254            _new_controller = None
255        else:
256            _new_controller = _new_brokers.get(metadata.controller_id)
257
258        _new_partitions = {}
259        _new_broker_partitions = collections.defaultdict(set)
260        _new_unauthorized_topics = set()
261        _new_internal_topics = set()
262
263        for topic_data in metadata.topics:
264            if metadata.API_VERSION == 0:
265                error_code, topic, partitions = topic_data
266                is_internal = False
267            else:
268                error_code, topic, is_internal, partitions = topic_data
269            if is_internal:
270                _new_internal_topics.add(topic)
271            error_type = Errors.for_code(error_code)
272            if error_type is Errors.NoError:
273                _new_partitions[topic] = {}
274                for p_error, partition, leader, replicas, isr in partitions:
275                    _new_partitions[topic][partition] = PartitionMetadata(
276                        topic=topic, partition=partition, leader=leader,
277                        replicas=replicas, isr=isr, error=p_error)
278                    if leader != -1:
279                        _new_broker_partitions[leader].add(
280                            TopicPartition(topic, partition))
281
282            elif error_type is Errors.LeaderNotAvailableError:
283                log.warning("Topic %s is not available during auto-create"
284                            " initialization", topic)
285            elif error_type is Errors.UnknownTopicOrPartitionError:
286                log.error("Topic %s not found in cluster metadata", topic)
287            elif error_type is Errors.TopicAuthorizationFailedError:
288                log.error("Topic %s is not authorized for this client", topic)
289                _new_unauthorized_topics.add(topic)
290            elif error_type is Errors.InvalidTopicError:
291                log.error("'%s' is not a valid topic name", topic)
292            else:
293                log.error("Error fetching metadata for topic %s: %s",
294                          topic, error_type)
295
296        with self._lock:
297            self._brokers = _new_brokers
298            self.controller = _new_controller
299            self._partitions = _new_partitions
300            self._broker_partitions = _new_broker_partitions
301            self.unauthorized_topics = _new_unauthorized_topics
302            self.internal_topics = _new_internal_topics
303            f = None
304            if self._future:
305                f = self._future
306            self._future = None
307            self._need_update = False
308
309        now = time.time() * 1000
310        self._last_refresh_ms = now
311        self._last_successful_refresh_ms = now
312
313        if f:
314            f.success(self)
315        log.debug("Updated cluster metadata to %s", self)
316
317        for listener in self._listeners:
318            listener(self)
319
320        if self.need_all_topic_metadata:
321            # the listener may change the interested topics,
322            # which could cause another metadata refresh.
323            # If we have already fetched all topics, however,
324            # another fetch should be unnecessary.
325            self._need_update = False
326
327    def add_listener(self, listener):
328        """Add a callback function to be called on each metadata update"""
329        self._listeners.add(listener)
330
331    def remove_listener(self, listener):
332        """Remove a previously added listener callback"""
333        self._listeners.remove(listener)
334
335    def add_group_coordinator(self, group, response):
336        """Update with metadata for a group coordinator
337
338        Arguments:
339            group (str): name of group from GroupCoordinatorRequest
340            response (GroupCoordinatorResponse): broker response
341
342        Returns:
343            bool: True if metadata is updated, False on error
344        """
345        log.debug("Updating coordinator for %s: %s", group, response)
346        error_type = Errors.for_code(response.error_code)
347        if error_type is not Errors.NoError:
348            log.error("GroupCoordinatorResponse error: %s", error_type)
349            self._groups[group] = -1
350            return False
351
352        node_id = response.coordinator_id
353        coordinator = BrokerMetadata(
354            response.coordinator_id,
355            response.host,
356            response.port,
357            None)
358
359        # Assume that group coordinators are just brokers
360        # (this is true now, but could diverge in future)
361        if node_id not in self._brokers:
362            self._brokers[node_id] = coordinator
363
364        # If this happens, either brokers have moved without
365        # changing IDs, or our assumption above is wrong
366        else:
367            node = self._brokers[node_id]
368            if coordinator.host != node.host or coordinator.port != node.port:
369                log.error("GroupCoordinator metadata conflicts with existing"
370                          " broker metadata. Coordinator: %s, Broker: %s",
371                          coordinator, node)
372                self._groups[group] = node_id
373                return False
374
375        log.info("Group coordinator for %s is %s", group, coordinator)
376        self._groups[group] = node_id
377        return True
378
379    def with_partitions(self, partitions_to_add):
380        """Returns a copy of cluster metadata with partitions added"""
381        new_metadata = ClusterMetadata(**self.config)
382        new_metadata._brokers = copy.deepcopy(self._brokers)
383        new_metadata._partitions = copy.deepcopy(self._partitions)
384        new_metadata._broker_partitions = copy.deepcopy(self._broker_partitions)
385        new_metadata._groups = copy.deepcopy(self._groups)
386        new_metadata.internal_topics = copy.deepcopy(self.internal_topics)
387        new_metadata.unauthorized_topics = copy.deepcopy(self.unauthorized_topics)
388
389        for partition in partitions_to_add:
390            new_metadata._partitions[partition.topic][partition.partition] = partition
391
392            if partition.leader is not None and partition.leader != -1:
393                new_metadata._broker_partitions[partition.leader].add(
394                    TopicPartition(partition.topic, partition.partition))
395
396        return new_metadata
397
398    def __str__(self):
399        return 'ClusterMetadata(brokers: %d, topics: %d, groups: %d)' % \
400               (len(self._brokers), len(self._partitions), len(self._groups))
401