1"""
2Kafka Admin client: create, view, alter, delete topics and resources.
3"""
4from ..cimpl import (KafkaException, # noqa
5                     _AdminClientImpl,
6                     NewTopic,
7                     NewPartitions,
8                     CONFIG_SOURCE_UNKNOWN_CONFIG,
9                     CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG,
10                     CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG,
11                     CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG,
12                     CONFIG_SOURCE_STATIC_BROKER_CONFIG,
13                     CONFIG_SOURCE_DEFAULT_CONFIG,
14                     RESOURCE_UNKNOWN,
15                     RESOURCE_ANY,
16                     RESOURCE_TOPIC,
17                     RESOURCE_GROUP,
18                     RESOURCE_BROKER)
19
20import concurrent.futures
21import functools
22
23from enum import Enum
24
25
26class ConfigSource(Enum):
27    """
28    Config sources returned in ConfigEntry by `describe_configs()`.
29    """
30    UNKNOWN_CONFIG = CONFIG_SOURCE_UNKNOWN_CONFIG  #:
31    DYNAMIC_TOPIC_CONFIG = CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG  #:
32    DYNAMIC_BROKER_CONFIG = CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG  #:
33    DYNAMIC_DEFAULT_BROKER_CONFIG = CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG  #:
34    STATIC_BROKER_CONFIG = CONFIG_SOURCE_STATIC_BROKER_CONFIG  #:
35    DEFAULT_CONFIG = CONFIG_SOURCE_DEFAULT_CONFIG  #:
36
37
38class ConfigEntry(object):
39    """
40    ConfigEntry is returned by describe_configs() for each configuration
41    entry for the specified resource.
42
43    This class is typically not user instantiated.
44
45    :ivar str name: Configuration property name.
46    :ivar str value: Configuration value (or None if not set or is_sensitive==True).
47    :ivar ConfigSource source: Configuration source.
48    :ivar bool is_read_only: Indicates if configuration property is read-only.
49    :ivar bool is_default: Indicates if configuration property is using its default value.
50    :ivar bool is_sensitive: Indicates if configuration property value
51                              contains sensitive information
52                              (such as security settings),
53                              in which case .value is None.
54    :ivar bool is_synonym: Indicates if configuration property is a
55                            synonym for the parent configuration entry.
56    :ivar list synonyms: A ConfigEntry list of synonyms and alternate sources for this configuration property.
57
58    """
59
60    def __init__(self, name, value,
61                 source=ConfigSource.UNKNOWN_CONFIG,
62                 is_read_only=False,
63                 is_default=False,
64                 is_sensitive=False,
65                 is_synonym=False,
66                 synonyms=[]):
67        """
68        This class is typically not user instantiated.
69        """
70        super(ConfigEntry, self).__init__()
71        self.name = name
72        self.value = value
73        self.source = source
74        self.is_read_only = bool(is_read_only)
75        self.is_default = bool(is_default)
76        self.is_sensitive = bool(is_sensitive)
77        self.is_synonym = bool(is_synonym)
78        self.synonyms = synonyms
79
80    def __repr__(self):
81        return "ConfigEntry(%s=\"%s\")" % (self.name, self.value)
82
83    def __str__(self):
84        return "%s=\"%s\"" % (self.name, self.value)
85
86
87@functools.total_ordering
88class ConfigResource(object):
89    """
90    Class representing resources that have configs.
91
92    Instantiate with a resource type and a resource name.
93    """
94
95    class Type(Enum):
96        """
97        ConfigResource.Type depicts the type of a Kafka resource.
98        """
99        UNKNOWN = RESOURCE_UNKNOWN  #: Resource type is not known or not set.
100        ANY = RESOURCE_ANY  #: Match any resource, used for lookups.
101        TOPIC = RESOURCE_TOPIC  #: Topic resource. Resource name is topic name
102        GROUP = RESOURCE_GROUP  #: Group resource. Resource name is group.id
103        BROKER = RESOURCE_BROKER  #: Broker resource. Resource name is broker id
104
105    def __init__(self, restype, name,
106                 set_config=None, described_configs=None, error=None):
107        """
108        :param ConfigResource.Type restype: Resource type.
109        :param str name: Resource name, depending on restype.
110                          For RESOURCE_BROKER the resource name is the broker id.
111        :param dict set_config: Configuration to set/overwrite. Dict of str, str.
112        :param dict described_configs: For internal use only.
113        :param KafkaError error: For internal use only.
114        """
115        super(ConfigResource, self).__init__()
116
117        if name is None:
118            raise ValueError("Expected resource name to be a string")
119
120        if type(restype) == str:
121            # Allow resource type to be specified as case-insensitive string, for convenience.
122            try:
123                restype = ConfigResource.Type[restype.upper()]
124            except KeyError:
125                raise ValueError("Unknown resource type \"%s\": should be a ConfigResource.Type" % restype)
126
127        elif type(restype) == int:
128            # The C-code passes restype as an int, convert to Type.
129            restype = ConfigResource.Type(restype)
130
131        self.restype = restype
132        self.restype_int = int(self.restype.value)  # for the C code
133        self.name = name
134
135        if set_config is not None:
136            self.set_config_dict = set_config.copy()
137        else:
138            self.set_config_dict = dict()
139
140        self.configs = described_configs
141        self.error = error
142
143    def __repr__(self):
144        if self.error is not None:
145            return "ConfigResource(%s,%s,%r)" % (self.restype, self.name, self.error)
146        else:
147            return "ConfigResource(%s,%s)" % (self.restype, self.name)
148
149    def __hash__(self):
150        return hash((self.restype, self.name))
151
152    def __lt__(self, other):
153        if self.restype < other.restype:
154            return True
155        return self.name.__lt__(other.name)
156
157    def __eq__(self, other):
158        return self.restype == other.restype and self.name == other.name
159
160    def __len__(self):
161        """
162        :rtype: int
163        :returns: number of configuration entries/operations
164        """
165        return len(self.set_config_dict)
166
167    def set_config(self, name, value, overwrite=True):
168        """
169        Set/Overwrite configuration entry
170
171        Any configuration properties that are not included will be reverted to their default values.
172        As a workaround use describe_configs() to retrieve the current configuration and
173        overwrite the settings you want to change.
174
175        :param str name: Configuration property name
176        :param str value: Configuration value
177        :param bool overwrite: If True overwrite entry if already exists (default).
178                               If False do nothing if entry already exists.
179        """
180        if not overwrite and name in self.set_config_dict:
181            return
182        self.set_config_dict[name] = value
183
184
185class AdminClient (_AdminClientImpl):
186    """
187    The Kafka AdminClient provides admin operations for Kafka brokers,
188    topics, groups, and other resource types supported by the broker.
189
190    The Admin API methods are asynchronous and returns a dict of
191    concurrent.futures.Future objects keyed by the entity.
192    The entity is a topic name for create_topics(), delete_topics(), create_partitions(),
193    and a ConfigResource for alter_configs(), describe_configs().
194
195    All the futures for a single API call will currently finish/fail at
196    the same time (backed by the same protocol request), but this might
197    change in future versions of the client.
198
199    See examples/adminapi.py for example usage.
200
201    For more information see the Java Admin API documentation:
202    https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/clients/admin/package-frame.html
203
204    Requires broker version v0.11.0.0 or later.
205    """
206    def __init__(self, conf):
207        """
208        Create a new AdminClient using the provided configuration dictionary.
209
210        The AdminClient is a standard Kafka protocol client, supporting
211        the standard librdkafka configuration properties as specified at
212        https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
213
214        At least 'bootstrap.servers' should be configured.
215        """
216        super(AdminClient, self).__init__(conf)
217
218    @staticmethod
219    def _make_topics_result(f, futmap):
220        """
221        Map per-topic results to per-topic futures in futmap.
222        The result value of each (successful) future is None.
223        """
224        try:
225            result = f.result()
226            for topic, error in result.items():
227                fut = futmap.get(topic, None)
228                if fut is None:
229                    raise RuntimeError("Topic {} not found in future-map: {}".format(topic, futmap))
230
231                if error is not None:
232                    # Topic-level exception
233                    fut.set_exception(KafkaException(error))
234                else:
235                    # Topic-level success
236                    fut.set_result(None)
237        except Exception as e:
238            # Request-level exception, raise the same for all topics
239            for topic, fut in futmap.items():
240                fut.set_exception(e)
241
242    @staticmethod
243    def _make_resource_result(f, futmap):
244        """
245        Map per-resource results to per-resource futures in futmap.
246        The result value of each (successful) future is a ConfigResource.
247        """
248        try:
249            result = f.result()
250            for resource, configs in result.items():
251                fut = futmap.get(resource, None)
252                if fut is None:
253                    raise RuntimeError("Resource {} not found in future-map: {}".format(resource, futmap))
254                if resource.error is not None:
255                    # Resource-level exception
256                    fut.set_exception(KafkaException(resource.error))
257                else:
258                    # Resource-level success
259                    # configs will be a dict for describe_configs()
260                    # and None for alter_configs()
261                    fut.set_result(configs)
262        except Exception as e:
263            # Request-level exception, raise the same for all resources
264            for resource, fut in futmap.items():
265                fut.set_exception(e)
266
267    @staticmethod
268    def _make_futures(futmap_keys, class_check, make_result_fn):
269        """
270        Create futures and a futuremap for the keys in futmap_keys,
271        and create a request-level future to be bassed to the C API.
272        """
273        futmap = {}
274        for key in futmap_keys:
275            if class_check is not None and not isinstance(key, class_check):
276                raise ValueError("Expected list of {}".format(type(class_check)))
277            futmap[key] = concurrent.futures.Future()
278            if not futmap[key].set_running_or_notify_cancel():
279                raise RuntimeError("Future was cancelled prematurely")
280
281        # Create an internal future for the entire request,
282        # this future will trigger _make_..._result() and set result/exception
283        # per topic,future in futmap.
284        f = concurrent.futures.Future()
285        f.add_done_callback(lambda f: make_result_fn(f, futmap))
286
287        if not f.set_running_or_notify_cancel():
288            raise RuntimeError("Future was cancelled prematurely")
289
290        return f, futmap
291
292    def create_topics(self, new_topics, **kwargs):
293        """
294        Create new topics in cluster.
295
296        The future result() value is None.
297
298        :param list(NewTopic) new_topics: New topics to be created.
299        :param float operation_timeout: Set broker's operation timeout in seconds,
300                  controlling how long the CreateTopics request will block
301                  on the broker waiting for the topic creation to propagate
302                  in the cluster. A value of 0 returns immediately. Default: 0
303        :param float request_timeout: Set the overall request timeout in seconds,
304                  including broker lookup, request transmission, operation time
305                  on broker, and response. Default: `socket.timeout.ms*1000.0`
306        :param bool validate_only: Tell broker to only validate the request,
307                  without creating the topic. Default: False
308
309        :returns: a dict of futures for each topic, keyed by the topic name.
310        :rtype: dict(<topic_name, future>)
311
312        :raises KafkaException: Operation failed locally or on broker.
313        :raises TypeException: Invalid input.
314        :raises ValueException: Invalid input.
315        """
316
317        f, futmap = AdminClient._make_futures([x.topic for x in new_topics],
318                                              None,
319                                              AdminClient._make_topics_result)
320
321        super(AdminClient, self).create_topics(new_topics, f, **kwargs)
322
323        return futmap
324
325    def delete_topics(self, topics, **kwargs):
326        """
327        Delete topics.
328
329        The future result() value is None.
330
331        :param list(str) topics: Topics to mark for deletion.
332        :param float operation_timeout: Set broker's operation timeout in seconds,
333                  controlling how long the DeleteTopics request will block
334                  on the broker waiting for the topic deletion to propagate
335                  in the cluster. A value of 0 returns immediately. Default: 0
336        :param float request_timeout: Set the overall request timeout in seconds,
337                  including broker lookup, request transmission, operation time
338                  on broker, and response. Default: `socket.timeout.ms*1000.0`
339
340        :returns: a dict of futures for each topic, keyed by the topic name.
341        :rtype: dict(<topic_name, future>)
342
343        :raises KafkaException: Operation failed locally or on broker.
344        :raises TypeException: Invalid input.
345        :raises ValueException: Invalid input.
346        """
347
348        f, futmap = AdminClient._make_futures(topics, None,
349                                              AdminClient._make_topics_result)
350
351        super(AdminClient, self).delete_topics(topics, f, **kwargs)
352
353        return futmap
354
355    def create_partitions(self, new_partitions, **kwargs):
356        """
357        Create additional partitions for the given topics.
358
359        The future result() value is None.
360
361        :param list(NewPartitions) new_partitions: New partitions to be created.
362        :param float operation_timeout: Set broker's operation timeout in seconds,
363                  controlling how long the CreatePartitions request will block
364                  on the broker waiting for the partition creation to propagate
365                  in the cluster. A value of 0 returns immediately. Default: 0
366        :param float request_timeout: Set the overall request timeout in seconds,
367                  including broker lookup, request transmission, operation time
368                  on broker, and response. Default: `socket.timeout.ms*1000.0`
369        :param bool validate_only: Tell broker to only validate the request,
370                  without creating the partitions. Default: False
371
372        :returns: a dict of futures for each topic, keyed by the topic name.
373        :rtype: dict(<topic_name, future>)
374
375        :raises KafkaException: Operation failed locally or on broker.
376        :raises TypeException: Invalid input.
377        :raises ValueException: Invalid input.
378        """
379
380        f, futmap = AdminClient._make_futures([x.topic for x in new_partitions],
381                                              None,
382                                              AdminClient._make_topics_result)
383
384        super(AdminClient, self).create_partitions(new_partitions, f, **kwargs)
385
386        return futmap
387
388    def describe_configs(self, resources, **kwargs):
389        """
390        Get configuration for the specified resources.
391
392        The future result() value is a dict(<configname, ConfigEntry>).
393
394        :warning: Multiple resources and resource types may be requested,
395                  but at most one resource of type RESOURCE_BROKER is allowed
396                  per call since these resource requests must be sent to the
397                  broker specified in the resource.
398
399        :param list(ConfigResource) resources: Resources to get configuration for.
400        :param float request_timeout: Set the overall request timeout in seconds,
401                  including broker lookup, request transmission, operation time
402                  on broker, and response. Default: `socket.timeout.ms*1000.0`
403        :param bool validate_only: Tell broker to only validate the request,
404                  without creating the partitions. Default: False
405
406        :returns: a dict of futures for each resource, keyed by the ConfigResource.
407        :rtype: dict(<ConfigResource, future>)
408
409        :raises KafkaException: Operation failed locally or on broker.
410        :raises TypeException: Invalid input.
411        :raises ValueException: Invalid input.
412        """
413
414        f, futmap = AdminClient._make_futures(resources, ConfigResource,
415                                              AdminClient._make_resource_result)
416
417        super(AdminClient, self).describe_configs(resources, f, **kwargs)
418
419        return futmap
420
421    def alter_configs(self, resources, **kwargs):
422        """
423        Update configuration values for the specified resources.
424        Updates are not transactional so they may succeed for a subset
425        of the provided resources while the others fail.
426        The configuration for a particular resource is updated atomically,
427        replacing the specified values while reverting unspecified configuration
428        entries to their default values.
429
430        The future result() value is None.
431
432        :warning: alter_configs() will replace all existing configuration for
433                  the provided resources with the new configuration given,
434                  reverting all other configuration for the resource back
435                  to their default values.
436
437        :warning: Multiple resources and resource types may be specified,
438                  but at most one resource of type RESOURCE_BROKER is allowed
439                  per call since these resource requests must be sent to the
440                  broker specified in the resource.
441
442        :param list(ConfigResource) resources: Resources to update configuration for.
443        :param float request_timeout: Set the overall request timeout in seconds,
444                  including broker lookup, request transmission, operation time
445                  on broker, and response. Default: `socket.timeout.ms*1000.0`.
446        :param bool validate_only: Tell broker to only validate the request,
447                  without altering the configuration. Default: False
448
449        :returns: a dict of futures for each resource, keyed by the ConfigResource.
450        :rtype: dict(<ConfigResource, future>)
451
452        :raises KafkaException: Operation failed locally or on broker.
453        :raises TypeException: Invalid input.
454        :raises ValueException: Invalid input.
455        """
456
457        f, futmap = AdminClient._make_futures(resources, ConfigResource,
458                                              AdminClient._make_resource_result)
459
460        super(AdminClient, self).alter_configs(resources, f, **kwargs)
461
462        return futmap
463
464
465class ClusterMetadata (object):
466    """
467    ClusterMetadata as returned by list_topics() contains information
468    about the Kafka cluster, brokers, and topics.
469
470    This class is typically not user instantiated.
471
472    :ivar str cluster_id: Cluster id string, if supported by broker, else None.
473    :ivar id controller_id: Current controller broker id, or -1.
474    :ivar dict brokers: Map of brokers indexed by the int broker id. Value is BrokerMetadata object.
475    :ivar dict topics: Map of topics indexed by the topic name. Value is TopicMetadata object.
476    :ivar int orig_broker_id: The broker this metadata originated from.
477    :ivar str orig_broker_name: Broker name/address this metadata originated from.
478    """
479    def __init__(self):
480        self.cluster_id = None
481        self.controller_id = -1
482        self.brokers = {}
483        self.topics = {}
484        self.orig_broker_id = -1
485        self.orig_broker_name = None
486
487    def __repr__(self):
488        return "ClusterMetadata({})".format(self.cluster_id)
489
490    def __str__(self):
491        return str(self.cluster_id)
492
493
494class BrokerMetadata (object):
495    """
496    BrokerMetadata contains information about a Kafka broker.
497
498    This class is typically not user instantiated.
499
500    :ivar int id: Broker id.
501    :ivar str host: Broker hostname.
502    :ivar int port: Broker port.
503    """
504    def __init__(self):
505        self.id = -1
506        self.host = None
507        self.port = -1
508
509    def __repr__(self):
510        return "BrokerMetadata({}, {}:{})".format(self.id, self.host, self.port)
511
512    def __str__(self):
513        return "{}:{}/{}".format(self.host, self.port, self.id)
514
515
516class TopicMetadata (object):
517    """
518    TopicMetadata contains information about a Kafka topic.
519
520    This class is typically not user instantiated.
521
522    :ivar str topic: Topic name.
523    :ivar dict partitions: Map of partitions indexed by partition id. Value is PartitionMetadata object.
524    :ivar KafkaError error: Topic error, or None. Value is a KafkaError object.
525    """
526    def __init__(self):
527        self.topic = None
528        self.partitions = {}
529        self.error = None
530
531    def __repr__(self):
532        if self.error is not None:
533            return "TopicMetadata({}, {} partitions, {})".format(self.topic, len(self.partitions), self.error)
534        else:
535            return "TopicMetadata({}, {} partitions)".format(self.topic, len(self.partitions))
536
537    def __str__(self):
538        return self.topic
539
540
541class PartitionMetadata (object):
542    """
543    PartitionsMetadata contains information about a Kafka partition.
544
545    This class is typically not user instantiated.
546
547    :ivar int id: Partition id.
548    :ivar int leader: Current leader broker for this partition, or -1.
549    :ivar list(int) replicas: List of replica broker ids for this partition.
550    :ivar list(int) isrs: List of in-sync-replica broker ids for this partition.
551    :ivar KafkaError error: Partition error, or None. Value is a KafkaError object.
552
553    :warning: Depending on cluster state the broker ids referenced in
554              leader, replicas and isrs may temporarily not be reported
555              in ClusterMetadata.brokers. Always check the availability
556              of a broker id in the brokers dict.
557    """
558    def __init__(self):
559        self.id = -1
560        self.leader = -1
561        self.replicas = []
562        self.isrs = []
563        self.error = None
564
565    def __repr__(self):
566        if self.error is not None:
567            return "PartitionMetadata({}, {})".format(self.id, self.error)
568        else:
569            return "PartitionMetadata({})".format(self.id)
570
571    def __str__(self):
572        return "{}".format(self.id)
573