1from __future__ import absolute_import 2 3import atexit 4import copy 5import logging 6import socket 7import threading 8import time 9import weakref 10 11from kafka.vendor import six 12 13import kafka.errors as Errors 14from kafka.client_async import KafkaClient, selectors 15from kafka.codec import has_gzip, has_snappy, has_lz4 16from kafka.metrics import MetricConfig, Metrics 17from kafka.partitioner.default import DefaultPartitioner 18from kafka.producer.future import FutureRecordMetadata, FutureProduceResult 19from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator 20from kafka.producer.sender import Sender 21from kafka.record.default_records import DefaultRecordBatchBuilder 22from kafka.record.legacy_records import LegacyRecordBatchBuilder 23from kafka.serializer import Serializer 24from kafka.structs import TopicPartition 25 26 27log = logging.getLogger(__name__) 28PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger() 29 30 31class KafkaProducer(object): 32 """A Kafka client that publishes records to the Kafka cluster. 33 34 The producer is thread safe and sharing a single producer instance across 35 threads will generally be faster than having multiple instances. 36 37 The producer consists of a pool of buffer space that holds records that 38 haven't yet been transmitted to the server as well as a background I/O 39 thread that is responsible for turning these records into requests and 40 transmitting them to the cluster. 41 42 :meth:`~kafka.KafkaProducer.send` is asynchronous. When called it adds the 43 record to a buffer of pending record sends and immediately returns. This 44 allows the producer to batch together individual records for efficiency. 45 46 The 'acks' config controls the criteria under which requests are considered 47 complete. The "all" setting will result in blocking on the full commit of 48 the record, the slowest but most durable setting. 49 50 If the request fails, the producer can automatically retry, unless 51 'retries' is configured to 0. Enabling retries also opens up the 52 possibility of duplicates (see the documentation on message 53 delivery semantics for details: 54 https://kafka.apache.org/documentation.html#semantics 55 ). 56 57 The producer maintains buffers of unsent records for each partition. These 58 buffers are of a size specified by the 'batch_size' config. Making this 59 larger can result in more batching, but requires more memory (since we will 60 generally have one of these buffers for each active partition). 61 62 By default a buffer is available to send immediately even if there is 63 additional unused space in the buffer. However if you want to reduce the 64 number of requests you can set 'linger_ms' to something greater than 0. 65 This will instruct the producer to wait up to that number of milliseconds 66 before sending a request in hope that more records will arrive to fill up 67 the same batch. This is analogous to Nagle's algorithm in TCP. Note that 68 records that arrive close together in time will generally batch together 69 even with linger_ms=0 so under heavy load batching will occur regardless of 70 the linger configuration; however setting this to something larger than 0 71 can lead to fewer, more efficient requests when not under maximal load at 72 the cost of a small amount of latency. 73 74 The buffer_memory controls the total amount of memory available to the 75 producer for buffering. If records are sent faster than they can be 76 transmitted to the server then this buffer space will be exhausted. When 77 the buffer space is exhausted additional send calls will block. 78 79 The key_serializer and value_serializer instruct how to turn the key and 80 value objects the user provides into bytes. 81 82 Keyword Arguments: 83 bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' 84 strings) that the producer should contact to bootstrap initial 85 cluster metadata. This does not have to be the full node list. 86 It just needs to have at least one broker that will respond to a 87 Metadata API Request. Default port is 9092. If no servers are 88 specified, will default to localhost:9092. 89 client_id (str): a name for this client. This string is passed in 90 each request to servers and can be used to identify specific 91 server-side log entries that correspond to this client. 92 Default: 'kafka-python-producer-#' (appended with a unique number 93 per instance) 94 key_serializer (callable): used to convert user-supplied keys to bytes 95 If not None, called as f(key), should return bytes. Default: None. 96 value_serializer (callable): used to convert user-supplied message 97 values to bytes. If not None, called as f(value), should return 98 bytes. Default: None. 99 acks (0, 1, 'all'): The number of acknowledgments the producer requires 100 the leader to have received before considering a request complete. 101 This controls the durability of records that are sent. The 102 following settings are common: 103 104 0: Producer will not wait for any acknowledgment from the server. 105 The message will immediately be added to the socket 106 buffer and considered sent. No guarantee can be made that the 107 server has received the record in this case, and the retries 108 configuration will not take effect (as the client won't 109 generally know of any failures). The offset given back for each 110 record will always be set to -1. 111 1: Wait for leader to write the record to its local log only. 112 Broker will respond without awaiting full acknowledgement from 113 all followers. In this case should the leader fail immediately 114 after acknowledging the record but before the followers have 115 replicated it then the record will be lost. 116 all: Wait for the full set of in-sync replicas to write the record. 117 This guarantees that the record will not be lost as long as at 118 least one in-sync replica remains alive. This is the strongest 119 available guarantee. 120 If unset, defaults to acks=1. 121 compression_type (str): The compression type for all data generated by 122 the producer. Valid values are 'gzip', 'snappy', 'lz4', or None. 123 Compression is of full batches of data, so the efficacy of batching 124 will also impact the compression ratio (more batching means better 125 compression). Default: None. 126 retries (int): Setting a value greater than zero will cause the client 127 to resend any record whose send fails with a potentially transient 128 error. Note that this retry is no different than if the client 129 resent the record upon receiving the error. Allowing retries 130 without setting max_in_flight_requests_per_connection to 1 will 131 potentially change the ordering of records because if two batches 132 are sent to a single partition, and the first fails and is retried 133 but the second succeeds, then the records in the second batch may 134 appear first. 135 Default: 0. 136 batch_size (int): Requests sent to brokers will contain multiple 137 batches, one for each partition with data available to be sent. 138 A small batch size will make batching less common and may reduce 139 throughput (a batch size of zero will disable batching entirely). 140 Default: 16384 141 linger_ms (int): The producer groups together any records that arrive 142 in between request transmissions into a single batched request. 143 Normally this occurs only under load when records arrive faster 144 than they can be sent out. However in some circumstances the client 145 may want to reduce the number of requests even under moderate load. 146 This setting accomplishes this by adding a small amount of 147 artificial delay; that is, rather than immediately sending out a 148 record the producer will wait for up to the given delay to allow 149 other records to be sent so that the sends can be batched together. 150 This can be thought of as analogous to Nagle's algorithm in TCP. 151 This setting gives the upper bound on the delay for batching: once 152 we get batch_size worth of records for a partition it will be sent 153 immediately regardless of this setting, however if we have fewer 154 than this many bytes accumulated for this partition we will 155 'linger' for the specified time waiting for more records to show 156 up. This setting defaults to 0 (i.e. no delay). Setting linger_ms=5 157 would have the effect of reducing the number of requests sent but 158 would add up to 5ms of latency to records sent in the absense of 159 load. Default: 0. 160 partitioner (callable): Callable used to determine which partition 161 each message is assigned to. Called (after key serialization): 162 partitioner(key_bytes, all_partitions, available_partitions). 163 The default partitioner implementation hashes each non-None key 164 using the same murmur2 algorithm as the java client so that 165 messages with the same key are assigned to the same partition. 166 When a key is None, the message is delivered to a random partition 167 (filtered to partitions with available leaders only, if possible). 168 buffer_memory (int): The total bytes of memory the producer should use 169 to buffer records waiting to be sent to the server. If records are 170 sent faster than they can be delivered to the server the producer 171 will block up to max_block_ms, raising an exception on timeout. 172 In the current implementation, this setting is an approximation. 173 Default: 33554432 (32MB) 174 connections_max_idle_ms: Close idle connections after the number of 175 milliseconds specified by this config. The broker closes idle 176 connections after connections.max.idle.ms, so this avoids hitting 177 unexpected socket disconnected errors on the client. 178 Default: 540000 179 max_block_ms (int): Number of milliseconds to block during 180 :meth:`~kafka.KafkaProducer.send` and 181 :meth:`~kafka.KafkaProducer.partitions_for`. These methods can be 182 blocked either because the buffer is full or metadata unavailable. 183 Blocking in the user-supplied serializers or partitioner will not be 184 counted against this timeout. Default: 60000. 185 max_request_size (int): The maximum size of a request. This is also 186 effectively a cap on the maximum record size. Note that the server 187 has its own cap on record size which may be different from this. 188 This setting will limit the number of record batches the producer 189 will send in a single request to avoid sending huge requests. 190 Default: 1048576. 191 metadata_max_age_ms (int): The period of time in milliseconds after 192 which we force a refresh of metadata even if we haven't seen any 193 partition leadership changes to proactively discover any new 194 brokers or partitions. Default: 300000 195 retry_backoff_ms (int): Milliseconds to backoff when retrying on 196 errors. Default: 100. 197 request_timeout_ms (int): Client request timeout in milliseconds. 198 Default: 30000. 199 receive_buffer_bytes (int): The size of the TCP receive buffer 200 (SO_RCVBUF) to use when reading data. Default: None (relies on 201 system defaults). Java client defaults to 32768. 202 send_buffer_bytes (int): The size of the TCP send buffer 203 (SO_SNDBUF) to use when sending data. Default: None (relies on 204 system defaults). Java client defaults to 131072. 205 socket_options (list): List of tuple-arguments to socket.setsockopt 206 to apply to broker connection sockets. Default: 207 [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)] 208 reconnect_backoff_ms (int): The amount of time in milliseconds to 209 wait before attempting to reconnect to a given host. 210 Default: 50. 211 reconnect_backoff_max_ms (int): The maximum amount of time in 212 milliseconds to wait when reconnecting to a broker that has 213 repeatedly failed to connect. If provided, the backoff per host 214 will increase exponentially for each consecutive connection 215 failure, up to this maximum. To avoid connection storms, a 216 randomization factor of 0.2 will be applied to the backoff 217 resulting in a random range between 20% below and 20% above 218 the computed value. Default: 1000. 219 max_in_flight_requests_per_connection (int): Requests are pipelined 220 to kafka brokers up to this number of maximum requests per 221 broker connection. Note that if this setting is set to be greater 222 than 1 and there are failed sends, there is a risk of message 223 re-ordering due to retries (i.e., if retries are enabled). 224 Default: 5. 225 security_protocol (str): Protocol used to communicate with brokers. 226 Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. 227 Default: PLAINTEXT. 228 ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping 229 socket connections. If provided, all other ssl_* configurations 230 will be ignored. Default: None. 231 ssl_check_hostname (bool): flag to configure whether ssl handshake 232 should verify that the certificate matches the brokers hostname. 233 default: true. 234 ssl_cafile (str): optional filename of ca file to use in certificate 235 veriication. default: none. 236 ssl_certfile (str): optional filename of file in pem format containing 237 the client certificate, as well as any ca certificates needed to 238 establish the certificate's authenticity. default: none. 239 ssl_keyfile (str): optional filename containing the client private key. 240 default: none. 241 ssl_password (str): optional password to be used when loading the 242 certificate chain. default: none. 243 ssl_crlfile (str): optional filename containing the CRL to check for 244 certificate expiration. By default, no CRL check is done. When 245 providing a file, only the leaf certificate will be checked against 246 this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+. 247 default: none. 248 api_version (tuple): Specify which Kafka API version to use. If set to 249 None, the client will attempt to infer the broker version by probing 250 various APIs. Example: (0, 10, 2). Default: None 251 api_version_auto_timeout_ms (int): number of milliseconds to throw a 252 timeout exception from the constructor when checking the broker 253 api version. Only applies if api_version set to 'auto' 254 metric_reporters (list): A list of classes to use as metrics reporters. 255 Implementing the AbstractMetricsReporter interface allows plugging 256 in classes that will be notified of new metric creation. Default: [] 257 metrics_num_samples (int): The number of samples maintained to compute 258 metrics. Default: 2 259 metrics_sample_window_ms (int): The maximum age in milliseconds of 260 samples used to compute metrics. Default: 30000 261 selector (selectors.BaseSelector): Provide a specific selector 262 implementation to use for I/O multiplexing. 263 Default: selectors.DefaultSelector 264 sasl_mechanism (str): string picking sasl mechanism when security_protocol 265 is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported. 266 Default: None 267 sasl_plain_username (str): username for sasl PLAIN authentication. 268 Default: None 269 sasl_plain_password (str): password for sasl PLAIN authentication. 270 Default: None 271 sasl_kerberos_service_name (str): Service name to include in GSSAPI 272 sasl mechanism handshake. Default: 'kafka' 273 sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI 274 sasl mechanism handshake. Default: one of bootstrap servers 275 276 Note: 277 Configuration parameters are described in more detail at 278 https://kafka.apache.org/0100/configuration.html#producerconfigs 279 """ 280 DEFAULT_CONFIG = { 281 'bootstrap_servers': 'localhost', 282 'client_id': None, 283 'key_serializer': None, 284 'value_serializer': None, 285 'acks': 1, 286 'bootstrap_topics_filter': set(), 287 'compression_type': None, 288 'retries': 0, 289 'batch_size': 16384, 290 'linger_ms': 0, 291 'partitioner': DefaultPartitioner(), 292 'buffer_memory': 33554432, 293 'connections_max_idle_ms': 9 * 60 * 1000, 294 'max_block_ms': 60000, 295 'max_request_size': 1048576, 296 'metadata_max_age_ms': 300000, 297 'retry_backoff_ms': 100, 298 'request_timeout_ms': 30000, 299 'receive_buffer_bytes': None, 300 'send_buffer_bytes': None, 301 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], 302 'sock_chunk_bytes': 4096, # undocumented experimental option 303 'sock_chunk_buffer_count': 1000, # undocumented experimental option 304 'reconnect_backoff_ms': 50, 305 'reconnect_backoff_max_ms': 1000, 306 'max_in_flight_requests_per_connection': 5, 307 'security_protocol': 'PLAINTEXT', 308 'ssl_context': None, 309 'ssl_check_hostname': True, 310 'ssl_cafile': None, 311 'ssl_certfile': None, 312 'ssl_keyfile': None, 313 'ssl_crlfile': None, 314 'ssl_password': None, 315 'api_version': None, 316 'api_version_auto_timeout_ms': 2000, 317 'metric_reporters': [], 318 'metrics_num_samples': 2, 319 'metrics_sample_window_ms': 30000, 320 'selector': selectors.DefaultSelector, 321 'sasl_mechanism': None, 322 'sasl_plain_username': None, 323 'sasl_plain_password': None, 324 'sasl_kerberos_service_name': 'kafka', 325 'sasl_kerberos_domain_name': None 326 } 327 328 _COMPRESSORS = { 329 'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP), 330 'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY), 331 'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4), 332 None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE), 333 } 334 335 def __init__(self, **configs): 336 log.debug("Starting the Kafka producer") # trace 337 self.config = copy.copy(self.DEFAULT_CONFIG) 338 for key in self.config: 339 if key in configs: 340 self.config[key] = configs.pop(key) 341 342 # Only check for extra config keys in top-level class 343 assert not configs, 'Unrecognized configs: %s' % (configs,) 344 345 if self.config['client_id'] is None: 346 self.config['client_id'] = 'kafka-python-producer-%s' % \ 347 (PRODUCER_CLIENT_ID_SEQUENCE.increment(),) 348 349 if self.config['acks'] == 'all': 350 self.config['acks'] = -1 351 352 # api_version was previously a str. accept old format for now 353 if isinstance(self.config['api_version'], str): 354 deprecated = self.config['api_version'] 355 if deprecated == 'auto': 356 self.config['api_version'] = None 357 else: 358 self.config['api_version'] = tuple(map(int, deprecated.split('.'))) 359 log.warning('use api_version=%s [tuple] -- "%s" as str is deprecated', 360 str(self.config['api_version']), deprecated) 361 362 # Configure metrics 363 metrics_tags = {'client-id': self.config['client_id']} 364 metric_config = MetricConfig(samples=self.config['metrics_num_samples'], 365 time_window_ms=self.config['metrics_sample_window_ms'], 366 tags=metrics_tags) 367 reporters = [reporter() for reporter in self.config['metric_reporters']] 368 self._metrics = Metrics(metric_config, reporters) 369 370 client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer', 371 wakeup_timeout_ms=self.config['max_block_ms'], 372 **self.config) 373 374 # Get auto-discovered version from client if necessary 375 if self.config['api_version'] is None: 376 self.config['api_version'] = client.config['api_version'] 377 378 if self.config['compression_type'] == 'lz4': 379 assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' 380 381 # Check compression_type for library support 382 ct = self.config['compression_type'] 383 if ct not in self._COMPRESSORS: 384 raise ValueError("Not supported codec: {}".format(ct)) 385 else: 386 checker, compression_attrs = self._COMPRESSORS[ct] 387 assert checker(), "Libraries for {} compression codec not found".format(ct) 388 self.config['compression_attrs'] = compression_attrs 389 390 message_version = self._max_usable_produce_magic() 391 self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config) 392 self._metadata = client.cluster 393 guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) 394 self._sender = Sender(client, self._metadata, 395 self._accumulator, self._metrics, 396 guarantee_message_order=guarantee_message_order, 397 **self.config) 398 self._sender.daemon = True 399 self._sender.start() 400 self._closed = False 401 402 self._cleanup = self._cleanup_factory() 403 atexit.register(self._cleanup) 404 log.debug("Kafka producer started") 405 406 def _cleanup_factory(self): 407 """Build a cleanup clojure that doesn't increase our ref count""" 408 _self = weakref.proxy(self) 409 def wrapper(): 410 try: 411 _self.close(timeout=0) 412 except (ReferenceError, AttributeError): 413 pass 414 return wrapper 415 416 def _unregister_cleanup(self): 417 if getattr(self, '_cleanup', None): 418 if hasattr(atexit, 'unregister'): 419 atexit.unregister(self._cleanup) # pylint: disable=no-member 420 421 # py2 requires removing from private attribute... 422 else: 423 424 # ValueError on list.remove() if the exithandler no longer exists 425 # but that is fine here 426 try: 427 atexit._exithandlers.remove( # pylint: disable=no-member 428 (self._cleanup, (), {})) 429 except ValueError: 430 pass 431 self._cleanup = None 432 433 def __del__(self): 434 self.close(timeout=0) 435 436 def close(self, timeout=None): 437 """Close this producer. 438 439 Arguments: 440 timeout (float, optional): timeout in seconds to wait for completion. 441 """ 442 443 # drop our atexit handler now to avoid leaks 444 self._unregister_cleanup() 445 446 if not hasattr(self, '_closed') or self._closed: 447 log.info('Kafka producer closed') 448 return 449 if timeout is None: 450 # threading.TIMEOUT_MAX is available in Python3.3+ 451 timeout = getattr(threading, 'TIMEOUT_MAX', float('inf')) 452 if getattr(threading, 'TIMEOUT_MAX', False): 453 assert 0 <= timeout <= getattr(threading, 'TIMEOUT_MAX') 454 else: 455 assert timeout >= 0 456 457 log.info("Closing the Kafka producer with %s secs timeout.", timeout) 458 #first_exception = AtomicReference() # this will keep track of the first encountered exception 459 invoked_from_callback = bool(threading.current_thread() is self._sender) 460 if timeout > 0: 461 if invoked_from_callback: 462 log.warning("Overriding close timeout %s secs to 0 in order to" 463 " prevent useless blocking due to self-join. This" 464 " means you have incorrectly invoked close with a" 465 " non-zero timeout from the producer call-back.", 466 timeout) 467 else: 468 # Try to close gracefully. 469 if self._sender is not None: 470 self._sender.initiate_close() 471 self._sender.join(timeout) 472 473 if self._sender is not None and self._sender.is_alive(): 474 475 log.info("Proceeding to force close the producer since pending" 476 " requests could not be completed within timeout %s.", 477 timeout) 478 self._sender.force_close() 479 # Only join the sender thread when not calling from callback. 480 if not invoked_from_callback: 481 self._sender.join() 482 483 self._metrics.close() 484 try: 485 self.config['key_serializer'].close() 486 except AttributeError: 487 pass 488 try: 489 self.config['value_serializer'].close() 490 except AttributeError: 491 pass 492 self._closed = True 493 log.debug("The Kafka producer has closed.") 494 495 def partitions_for(self, topic): 496 """Returns set of all known partitions for the topic.""" 497 max_wait = self.config['max_block_ms'] / 1000.0 498 return self._wait_on_metadata(topic, max_wait) 499 500 def _max_usable_produce_magic(self): 501 if self.config['api_version'] >= (0, 11): 502 return 2 503 elif self.config['api_version'] >= (0, 10): 504 return 1 505 else: 506 return 0 507 508 def _estimate_size_in_bytes(self, key, value, headers=[]): 509 magic = self._max_usable_produce_magic() 510 if magic == 2: 511 return DefaultRecordBatchBuilder.estimate_size_in_bytes( 512 key, value, headers) 513 else: 514 return LegacyRecordBatchBuilder.estimate_size_in_bytes( 515 magic, self.config['compression_type'], key, value) 516 517 def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None): 518 """Publish a message to a topic. 519 520 Arguments: 521 topic (str): topic where the message will be published 522 value (optional): message value. Must be type bytes, or be 523 serializable to bytes via configured value_serializer. If value 524 is None, key is required and message acts as a 'delete'. 525 See kafka compaction documentation for more details: 526 https://kafka.apache.org/documentation.html#compaction 527 (compaction requires kafka >= 0.8.1) 528 partition (int, optional): optionally specify a partition. If not 529 set, the partition will be selected using the configured 530 'partitioner'. 531 key (optional): a key to associate with the message. Can be used to 532 determine which partition to send the message to. If partition 533 is None (and producer's partitioner config is left as default), 534 then messages with the same key will be delivered to the same 535 partition (but if key is None, partition is chosen randomly). 536 Must be type bytes, or be serializable to bytes via configured 537 key_serializer. 538 headers (optional): a list of header key value pairs. List items 539 are tuples of str key and bytes value. 540 timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC) 541 to use as the message timestamp. Defaults to current time. 542 543 Returns: 544 FutureRecordMetadata: resolves to RecordMetadata 545 546 Raises: 547 KafkaTimeoutError: if unable to fetch topic metadata, or unable 548 to obtain memory buffer prior to configured max_block_ms 549 """ 550 assert value is not None or self.config['api_version'] >= (0, 8, 1), ( 551 'Null messages require kafka >= 0.8.1') 552 assert not (value is None and key is None), 'Need at least one: key or value' 553 key_bytes = value_bytes = None 554 try: 555 self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0) 556 557 key_bytes = self._serialize( 558 self.config['key_serializer'], 559 topic, key) 560 value_bytes = self._serialize( 561 self.config['value_serializer'], 562 topic, value) 563 assert type(key_bytes) in (bytes, bytearray, memoryview, type(None)) 564 assert type(value_bytes) in (bytes, bytearray, memoryview, type(None)) 565 566 partition = self._partition(topic, partition, key, value, 567 key_bytes, value_bytes) 568 569 if headers is None: 570 headers = [] 571 assert type(headers) == list 572 assert all(type(item) == tuple and len(item) == 2 and type(item[0]) == str and type(item[1]) == bytes for item in headers) 573 574 message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers) 575 self._ensure_valid_record_size(message_size) 576 577 tp = TopicPartition(topic, partition) 578 log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp) 579 result = self._accumulator.append(tp, timestamp_ms, 580 key_bytes, value_bytes, headers, 581 self.config['max_block_ms'], 582 estimated_size=message_size) 583 future, batch_is_full, new_batch_created = result 584 if batch_is_full or new_batch_created: 585 log.debug("Waking up the sender since %s is either full or" 586 " getting a new batch", tp) 587 self._sender.wakeup() 588 589 return future 590 # handling exceptions and record the errors; 591 # for API exceptions return them in the future, 592 # for other exceptions raise directly 593 except Errors.BrokerResponseError as e: 594 log.debug("Exception occurred during message send: %s", e) 595 return FutureRecordMetadata( 596 FutureProduceResult(TopicPartition(topic, partition)), 597 -1, None, None, 598 len(key_bytes) if key_bytes is not None else -1, 599 len(value_bytes) if value_bytes is not None else -1, 600 sum(len(h_key.encode("utf-8")) + len(h_value) for h_key, h_value in headers) if headers else -1, 601 ).failure(e) 602 603 def flush(self, timeout=None): 604 """ 605 Invoking this method makes all buffered records immediately available 606 to send (even if linger_ms is greater than 0) and blocks on the 607 completion of the requests associated with these records. The 608 post-condition of :meth:`~kafka.KafkaProducer.flush` is that any 609 previously sent record will have completed 610 (e.g. Future.is_done() == True). A request is considered completed when 611 either it is successfully acknowledged according to the 'acks' 612 configuration for the producer, or it results in an error. 613 614 Other threads can continue sending messages while one thread is blocked 615 waiting for a flush call to complete; however, no guarantee is made 616 about the completion of messages sent after the flush call begins. 617 618 Arguments: 619 timeout (float, optional): timeout in seconds to wait for completion. 620 621 Raises: 622 KafkaTimeoutError: failure to flush buffered records within the 623 provided timeout 624 """ 625 log.debug("Flushing accumulated records in producer.") # trace 626 self._accumulator.begin_flush() 627 self._sender.wakeup() 628 self._accumulator.await_flush_completion(timeout=timeout) 629 630 def _ensure_valid_record_size(self, size): 631 """Validate that the record size isn't too large.""" 632 if size > self.config['max_request_size']: 633 raise Errors.MessageSizeTooLargeError( 634 "The message is %d bytes when serialized which is larger than" 635 " the maximum request size you have configured with the" 636 " max_request_size configuration" % (size,)) 637 if size > self.config['buffer_memory']: 638 raise Errors.MessageSizeTooLargeError( 639 "The message is %d bytes when serialized which is larger than" 640 " the total memory buffer you have configured with the" 641 " buffer_memory configuration." % (size,)) 642 643 def _wait_on_metadata(self, topic, max_wait): 644 """ 645 Wait for cluster metadata including partitions for the given topic to 646 be available. 647 648 Arguments: 649 topic (str): topic we want metadata for 650 max_wait (float): maximum time in secs for waiting on the metadata 651 652 Returns: 653 set: partition ids for the topic 654 655 Raises: 656 KafkaTimeoutError: if partitions for topic were not obtained before 657 specified max_wait timeout 658 """ 659 # add topic to metadata topic list if it is not there already. 660 self._sender.add_topic(topic) 661 begin = time.time() 662 elapsed = 0.0 663 metadata_event = None 664 while True: 665 partitions = self._metadata.partitions_for_topic(topic) 666 if partitions is not None: 667 return partitions 668 669 if not metadata_event: 670 metadata_event = threading.Event() 671 672 log.debug("Requesting metadata update for topic %s", topic) 673 674 metadata_event.clear() 675 future = self._metadata.request_update() 676 future.add_both(lambda e, *args: e.set(), metadata_event) 677 self._sender.wakeup() 678 metadata_event.wait(max_wait - elapsed) 679 elapsed = time.time() - begin 680 if not metadata_event.is_set(): 681 raise Errors.KafkaTimeoutError( 682 "Failed to update metadata after %.1f secs." % (max_wait,)) 683 elif topic in self._metadata.unauthorized_topics: 684 raise Errors.TopicAuthorizationFailedError(topic) 685 else: 686 log.debug("_wait_on_metadata woke after %s secs.", elapsed) 687 688 def _serialize(self, f, topic, data): 689 if not f: 690 return data 691 if isinstance(f, Serializer): 692 return f.serialize(topic, data) 693 return f(data) 694 695 def _partition(self, topic, partition, key, value, 696 serialized_key, serialized_value): 697 if partition is not None: 698 assert partition >= 0 699 assert partition in self._metadata.partitions_for_topic(topic), 'Unrecognized partition' 700 return partition 701 702 all_partitions = sorted(self._metadata.partitions_for_topic(topic)) 703 available = list(self._metadata.available_partitions_for_topic(topic)) 704 return self.config['partitioner'](serialized_key, 705 all_partitions, 706 available) 707 708 def metrics(self, raw=False): 709 """Get metrics on producer performance. 710 711 This is ported from the Java Producer, for details see: 712 https://kafka.apache.org/documentation/#producer_monitoring 713 714 Warning: 715 This is an unstable interface. It may change in future 716 releases without warning. 717 """ 718 if raw: 719 return self._metrics.metrics.copy() 720 721 metrics = {} 722 for k, v in six.iteritems(self._metrics.metrics.copy()): 723 if k.group not in metrics: 724 metrics[k.group] = {} 725 if k.name not in metrics[k.group]: 726 metrics[k.group][k.name] = {} 727 metrics[k.group][k.name] = v.value() 728 return metrics 729