1""" 2Kafka Admin client: create, view, alter, delete topics and resources. 3""" 4from ..cimpl import (KafkaException, # noqa 5 _AdminClientImpl, 6 NewTopic, 7 NewPartitions, 8 CONFIG_SOURCE_UNKNOWN_CONFIG, 9 CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG, 10 CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG, 11 CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG, 12 CONFIG_SOURCE_STATIC_BROKER_CONFIG, 13 CONFIG_SOURCE_DEFAULT_CONFIG, 14 RESOURCE_UNKNOWN, 15 RESOURCE_ANY, 16 RESOURCE_TOPIC, 17 RESOURCE_GROUP, 18 RESOURCE_BROKER) 19 20import concurrent.futures 21import functools 22 23from enum import Enum 24 25 26class ConfigSource(Enum): 27 """ 28 Config sources returned in ConfigEntry by `describe_configs()`. 29 """ 30 UNKNOWN_CONFIG = CONFIG_SOURCE_UNKNOWN_CONFIG #: 31 DYNAMIC_TOPIC_CONFIG = CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG #: 32 DYNAMIC_BROKER_CONFIG = CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG #: 33 DYNAMIC_DEFAULT_BROKER_CONFIG = CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG #: 34 STATIC_BROKER_CONFIG = CONFIG_SOURCE_STATIC_BROKER_CONFIG #: 35 DEFAULT_CONFIG = CONFIG_SOURCE_DEFAULT_CONFIG #: 36 37 38class ConfigEntry(object): 39 """ 40 ConfigEntry is returned by describe_configs() for each configuration 41 entry for the specified resource. 42 43 This class is typically not user instantiated. 44 45 :ivar str name: Configuration property name. 46 :ivar str value: Configuration value (or None if not set or is_sensitive==True). 47 :ivar ConfigSource source: Configuration source. 48 :ivar bool is_read_only: Indicates if configuration property is read-only. 49 :ivar bool is_default: Indicates if configuration property is using its default value. 50 :ivar bool is_sensitive: Indicates if configuration property value 51 contains sensitive information 52 (such as security settings), 53 in which case .value is None. 54 :ivar bool is_synonym: Indicates if configuration property is a 55 synonym for the parent configuration entry. 56 :ivar list synonyms: A ConfigEntry list of synonyms and alternate sources for this configuration property. 57 58 """ 59 60 def __init__(self, name, value, 61 source=ConfigSource.UNKNOWN_CONFIG, 62 is_read_only=False, 63 is_default=False, 64 is_sensitive=False, 65 is_synonym=False, 66 synonyms=[]): 67 """ 68 This class is typically not user instantiated. 69 """ 70 super(ConfigEntry, self).__init__() 71 self.name = name 72 self.value = value 73 self.source = source 74 self.is_read_only = bool(is_read_only) 75 self.is_default = bool(is_default) 76 self.is_sensitive = bool(is_sensitive) 77 self.is_synonym = bool(is_synonym) 78 self.synonyms = synonyms 79 80 def __repr__(self): 81 return "ConfigEntry(%s=\"%s\")" % (self.name, self.value) 82 83 def __str__(self): 84 return "%s=\"%s\"" % (self.name, self.value) 85 86 87@functools.total_ordering 88class ConfigResource(object): 89 """ 90 Class representing resources that have configs. 91 92 Instantiate with a resource type and a resource name. 93 """ 94 95 class Type(Enum): 96 """ 97 ConfigResource.Type depicts the type of a Kafka resource. 98 """ 99 UNKNOWN = RESOURCE_UNKNOWN #: Resource type is not known or not set. 100 ANY = RESOURCE_ANY #: Match any resource, used for lookups. 101 TOPIC = RESOURCE_TOPIC #: Topic resource. Resource name is topic name 102 GROUP = RESOURCE_GROUP #: Group resource. Resource name is group.id 103 BROKER = RESOURCE_BROKER #: Broker resource. Resource name is broker id 104 105 def __init__(self, restype, name, 106 set_config=None, described_configs=None, error=None): 107 """ 108 :param ConfigResource.Type restype: Resource type. 109 :param str name: Resource name, depending on restype. 110 For RESOURCE_BROKER the resource name is the broker id. 111 :param dict set_config: Configuration to set/overwrite. Dict of str, str. 112 :param dict described_configs: For internal use only. 113 :param KafkaError error: For internal use only. 114 """ 115 super(ConfigResource, self).__init__() 116 117 if name is None: 118 raise ValueError("Expected resource name to be a string") 119 120 if type(restype) == str: 121 # Allow resource type to be specified as case-insensitive string, for convenience. 122 try: 123 restype = ConfigResource.Type[restype.upper()] 124 except KeyError: 125 raise ValueError("Unknown resource type \"%s\": should be a ConfigResource.Type" % restype) 126 127 elif type(restype) == int: 128 # The C-code passes restype as an int, convert to Type. 129 restype = ConfigResource.Type(restype) 130 131 self.restype = restype 132 self.restype_int = int(self.restype.value) # for the C code 133 self.name = name 134 135 if set_config is not None: 136 self.set_config_dict = set_config.copy() 137 else: 138 self.set_config_dict = dict() 139 140 self.configs = described_configs 141 self.error = error 142 143 def __repr__(self): 144 if self.error is not None: 145 return "ConfigResource(%s,%s,%r)" % (self.restype, self.name, self.error) 146 else: 147 return "ConfigResource(%s,%s)" % (self.restype, self.name) 148 149 def __hash__(self): 150 return hash((self.restype, self.name)) 151 152 def __lt__(self, other): 153 if self.restype < other.restype: 154 return True 155 return self.name.__lt__(other.name) 156 157 def __eq__(self, other): 158 return self.restype == other.restype and self.name == other.name 159 160 def __len__(self): 161 """ 162 :rtype: int 163 :returns: number of configuration entries/operations 164 """ 165 return len(self.set_config_dict) 166 167 def set_config(self, name, value, overwrite=True): 168 """ 169 Set/Overwrite configuration entry 170 171 Any configuration properties that are not included will be reverted to their default values. 172 As a workaround use describe_configs() to retrieve the current configuration and 173 overwrite the settings you want to change. 174 175 :param str name: Configuration property name 176 :param str value: Configuration value 177 :param bool overwrite: If True overwrite entry if already exists (default). 178 If False do nothing if entry already exists. 179 """ 180 if not overwrite and name in self.set_config_dict: 181 return 182 self.set_config_dict[name] = value 183 184 185class AdminClient (_AdminClientImpl): 186 """ 187 The Kafka AdminClient provides admin operations for Kafka brokers, 188 topics, groups, and other resource types supported by the broker. 189 190 The Admin API methods are asynchronous and returns a dict of 191 concurrent.futures.Future objects keyed by the entity. 192 The entity is a topic name for create_topics(), delete_topics(), create_partitions(), 193 and a ConfigResource for alter_configs(), describe_configs(). 194 195 All the futures for a single API call will currently finish/fail at 196 the same time (backed by the same protocol request), but this might 197 change in future versions of the client. 198 199 See examples/adminapi.py for example usage. 200 201 For more information see the Java Admin API documentation: 202 https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/clients/admin/package-frame.html 203 204 Requires broker version v0.11.0.0 or later. 205 """ 206 def __init__(self, conf): 207 """ 208 Create a new AdminClient using the provided configuration dictionary. 209 210 The AdminClient is a standard Kafka protocol client, supporting 211 the standard librdkafka configuration properties as specified at 212 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md 213 214 At least 'bootstrap.servers' should be configured. 215 """ 216 super(AdminClient, self).__init__(conf) 217 218 @staticmethod 219 def _make_topics_result(f, futmap): 220 """ 221 Map per-topic results to per-topic futures in futmap. 222 The result value of each (successful) future is None. 223 """ 224 try: 225 result = f.result() 226 for topic, error in result.items(): 227 fut = futmap.get(topic, None) 228 if fut is None: 229 raise RuntimeError("Topic {} not found in future-map: {}".format(topic, futmap)) 230 231 if error is not None: 232 # Topic-level exception 233 fut.set_exception(KafkaException(error)) 234 else: 235 # Topic-level success 236 fut.set_result(None) 237 except Exception as e: 238 # Request-level exception, raise the same for all topics 239 for topic, fut in futmap.items(): 240 fut.set_exception(e) 241 242 @staticmethod 243 def _make_resource_result(f, futmap): 244 """ 245 Map per-resource results to per-resource futures in futmap. 246 The result value of each (successful) future is a ConfigResource. 247 """ 248 try: 249 result = f.result() 250 for resource, configs in result.items(): 251 fut = futmap.get(resource, None) 252 if fut is None: 253 raise RuntimeError("Resource {} not found in future-map: {}".format(resource, futmap)) 254 if resource.error is not None: 255 # Resource-level exception 256 fut.set_exception(KafkaException(resource.error)) 257 else: 258 # Resource-level success 259 # configs will be a dict for describe_configs() 260 # and None for alter_configs() 261 fut.set_result(configs) 262 except Exception as e: 263 # Request-level exception, raise the same for all resources 264 for resource, fut in futmap.items(): 265 fut.set_exception(e) 266 267 @staticmethod 268 def _make_futures(futmap_keys, class_check, make_result_fn): 269 """ 270 Create futures and a futuremap for the keys in futmap_keys, 271 and create a request-level future to be bassed to the C API. 272 """ 273 futmap = {} 274 for key in futmap_keys: 275 if class_check is not None and not isinstance(key, class_check): 276 raise ValueError("Expected list of {}".format(type(class_check))) 277 futmap[key] = concurrent.futures.Future() 278 if not futmap[key].set_running_or_notify_cancel(): 279 raise RuntimeError("Future was cancelled prematurely") 280 281 # Create an internal future for the entire request, 282 # this future will trigger _make_..._result() and set result/exception 283 # per topic,future in futmap. 284 f = concurrent.futures.Future() 285 f.add_done_callback(lambda f: make_result_fn(f, futmap)) 286 287 if not f.set_running_or_notify_cancel(): 288 raise RuntimeError("Future was cancelled prematurely") 289 290 return f, futmap 291 292 def create_topics(self, new_topics, **kwargs): 293 """ 294 Create new topics in cluster. 295 296 The future result() value is None. 297 298 :param list(NewTopic) new_topics: New topics to be created. 299 :param float operation_timeout: Set broker's operation timeout in seconds, 300 controlling how long the CreateTopics request will block 301 on the broker waiting for the topic creation to propagate 302 in the cluster. A value of 0 returns immediately. Default: 0 303 :param float request_timeout: Set the overall request timeout in seconds, 304 including broker lookup, request transmission, operation time 305 on broker, and response. Default: `socket.timeout.ms*1000.0` 306 :param bool validate_only: Tell broker to only validate the request, 307 without creating the topic. Default: False 308 309 :returns: a dict of futures for each topic, keyed by the topic name. 310 :rtype: dict(<topic_name, future>) 311 312 :raises KafkaException: Operation failed locally or on broker. 313 :raises TypeException: Invalid input. 314 :raises ValueException: Invalid input. 315 """ 316 317 f, futmap = AdminClient._make_futures([x.topic for x in new_topics], 318 None, 319 AdminClient._make_topics_result) 320 321 super(AdminClient, self).create_topics(new_topics, f, **kwargs) 322 323 return futmap 324 325 def delete_topics(self, topics, **kwargs): 326 """ 327 Delete topics. 328 329 The future result() value is None. 330 331 :param list(str) topics: Topics to mark for deletion. 332 :param float operation_timeout: Set broker's operation timeout in seconds, 333 controlling how long the DeleteTopics request will block 334 on the broker waiting for the topic deletion to propagate 335 in the cluster. A value of 0 returns immediately. Default: 0 336 :param float request_timeout: Set the overall request timeout in seconds, 337 including broker lookup, request transmission, operation time 338 on broker, and response. Default: `socket.timeout.ms*1000.0` 339 340 :returns: a dict of futures for each topic, keyed by the topic name. 341 :rtype: dict(<topic_name, future>) 342 343 :raises KafkaException: Operation failed locally or on broker. 344 :raises TypeException: Invalid input. 345 :raises ValueException: Invalid input. 346 """ 347 348 f, futmap = AdminClient._make_futures(topics, None, 349 AdminClient._make_topics_result) 350 351 super(AdminClient, self).delete_topics(topics, f, **kwargs) 352 353 return futmap 354 355 def create_partitions(self, new_partitions, **kwargs): 356 """ 357 Create additional partitions for the given topics. 358 359 The future result() value is None. 360 361 :param list(NewPartitions) new_partitions: New partitions to be created. 362 :param float operation_timeout: Set broker's operation timeout in seconds, 363 controlling how long the CreatePartitions request will block 364 on the broker waiting for the partition creation to propagate 365 in the cluster. A value of 0 returns immediately. Default: 0 366 :param float request_timeout: Set the overall request timeout in seconds, 367 including broker lookup, request transmission, operation time 368 on broker, and response. Default: `socket.timeout.ms*1000.0` 369 :param bool validate_only: Tell broker to only validate the request, 370 without creating the partitions. Default: False 371 372 :returns: a dict of futures for each topic, keyed by the topic name. 373 :rtype: dict(<topic_name, future>) 374 375 :raises KafkaException: Operation failed locally or on broker. 376 :raises TypeException: Invalid input. 377 :raises ValueException: Invalid input. 378 """ 379 380 f, futmap = AdminClient._make_futures([x.topic for x in new_partitions], 381 None, 382 AdminClient._make_topics_result) 383 384 super(AdminClient, self).create_partitions(new_partitions, f, **kwargs) 385 386 return futmap 387 388 def describe_configs(self, resources, **kwargs): 389 """ 390 Get configuration for the specified resources. 391 392 The future result() value is a dict(<configname, ConfigEntry>). 393 394 :warning: Multiple resources and resource types may be requested, 395 but at most one resource of type RESOURCE_BROKER is allowed 396 per call since these resource requests must be sent to the 397 broker specified in the resource. 398 399 :param list(ConfigResource) resources: Resources to get configuration for. 400 :param float request_timeout: Set the overall request timeout in seconds, 401 including broker lookup, request transmission, operation time 402 on broker, and response. Default: `socket.timeout.ms*1000.0` 403 :param bool validate_only: Tell broker to only validate the request, 404 without creating the partitions. Default: False 405 406 :returns: a dict of futures for each resource, keyed by the ConfigResource. 407 :rtype: dict(<ConfigResource, future>) 408 409 :raises KafkaException: Operation failed locally or on broker. 410 :raises TypeException: Invalid input. 411 :raises ValueException: Invalid input. 412 """ 413 414 f, futmap = AdminClient._make_futures(resources, ConfigResource, 415 AdminClient._make_resource_result) 416 417 super(AdminClient, self).describe_configs(resources, f, **kwargs) 418 419 return futmap 420 421 def alter_configs(self, resources, **kwargs): 422 """ 423 Update configuration values for the specified resources. 424 Updates are not transactional so they may succeed for a subset 425 of the provided resources while the others fail. 426 The configuration for a particular resource is updated atomically, 427 replacing the specified values while reverting unspecified configuration 428 entries to their default values. 429 430 The future result() value is None. 431 432 :warning: alter_configs() will replace all existing configuration for 433 the provided resources with the new configuration given, 434 reverting all other configuration for the resource back 435 to their default values. 436 437 :warning: Multiple resources and resource types may be specified, 438 but at most one resource of type RESOURCE_BROKER is allowed 439 per call since these resource requests must be sent to the 440 broker specified in the resource. 441 442 :param list(ConfigResource) resources: Resources to update configuration for. 443 :param float request_timeout: Set the overall request timeout in seconds, 444 including broker lookup, request transmission, operation time 445 on broker, and response. Default: `socket.timeout.ms*1000.0`. 446 :param bool validate_only: Tell broker to only validate the request, 447 without altering the configuration. Default: False 448 449 :returns: a dict of futures for each resource, keyed by the ConfigResource. 450 :rtype: dict(<ConfigResource, future>) 451 452 :raises KafkaException: Operation failed locally or on broker. 453 :raises TypeException: Invalid input. 454 :raises ValueException: Invalid input. 455 """ 456 457 f, futmap = AdminClient._make_futures(resources, ConfigResource, 458 AdminClient._make_resource_result) 459 460 super(AdminClient, self).alter_configs(resources, f, **kwargs) 461 462 return futmap 463 464 465class ClusterMetadata (object): 466 """ 467 ClusterMetadata as returned by list_topics() contains information 468 about the Kafka cluster, brokers, and topics. 469 470 This class is typically not user instantiated. 471 472 :ivar str cluster_id: Cluster id string, if supported by broker, else None. 473 :ivar id controller_id: Current controller broker id, or -1. 474 :ivar dict brokers: Map of brokers indexed by the int broker id. Value is BrokerMetadata object. 475 :ivar dict topics: Map of topics indexed by the topic name. Value is TopicMetadata object. 476 :ivar int orig_broker_id: The broker this metadata originated from. 477 :ivar str orig_broker_name: Broker name/address this metadata originated from. 478 """ 479 def __init__(self): 480 self.cluster_id = None 481 self.controller_id = -1 482 self.brokers = {} 483 self.topics = {} 484 self.orig_broker_id = -1 485 self.orig_broker_name = None 486 487 def __repr__(self): 488 return "ClusterMetadata({})".format(self.cluster_id) 489 490 def __str__(self): 491 return str(self.cluster_id) 492 493 494class BrokerMetadata (object): 495 """ 496 BrokerMetadata contains information about a Kafka broker. 497 498 This class is typically not user instantiated. 499 500 :ivar int id: Broker id. 501 :ivar str host: Broker hostname. 502 :ivar int port: Broker port. 503 """ 504 def __init__(self): 505 self.id = -1 506 self.host = None 507 self.port = -1 508 509 def __repr__(self): 510 return "BrokerMetadata({}, {}:{})".format(self.id, self.host, self.port) 511 512 def __str__(self): 513 return "{}:{}/{}".format(self.host, self.port, self.id) 514 515 516class TopicMetadata (object): 517 """ 518 TopicMetadata contains information about a Kafka topic. 519 520 This class is typically not user instantiated. 521 522 :ivar str topic: Topic name. 523 :ivar dict partitions: Map of partitions indexed by partition id. Value is PartitionMetadata object. 524 :ivar KafkaError error: Topic error, or None. Value is a KafkaError object. 525 """ 526 def __init__(self): 527 self.topic = None 528 self.partitions = {} 529 self.error = None 530 531 def __repr__(self): 532 if self.error is not None: 533 return "TopicMetadata({}, {} partitions, {})".format(self.topic, len(self.partitions), self.error) 534 else: 535 return "TopicMetadata({}, {} partitions)".format(self.topic, len(self.partitions)) 536 537 def __str__(self): 538 return self.topic 539 540 541class PartitionMetadata (object): 542 """ 543 PartitionsMetadata contains information about a Kafka partition. 544 545 This class is typically not user instantiated. 546 547 :ivar int id: Partition id. 548 :ivar int leader: Current leader broker for this partition, or -1. 549 :ivar list(int) replicas: List of replica broker ids for this partition. 550 :ivar list(int) isrs: List of in-sync-replica broker ids for this partition. 551 :ivar KafkaError error: Partition error, or None. Value is a KafkaError object. 552 553 :warning: Depending on cluster state the broker ids referenced in 554 leader, replicas and isrs may temporarily not be reported 555 in ClusterMetadata.brokers. Always check the availability 556 of a broker id in the brokers dict. 557 """ 558 def __init__(self): 559 self.id = -1 560 self.leader = -1 561 self.replicas = [] 562 self.isrs = [] 563 self.error = None 564 565 def __repr__(self): 566 if self.error is not None: 567 return "PartitionMetadata({}, {})".format(self.id, self.error) 568 else: 569 return "PartitionMetadata({})".format(self.id) 570 571 def __str__(self): 572 return "{}".format(self.id) 573