1#!/usr/bin/env python 2# 3# Copyright 2016 Confluent Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import argparse 19import time 20from confluent_kafka import Producer, KafkaException 21from verifiable_client import VerifiableClient 22 23 24class VerifiableProducer(VerifiableClient): 25 """ 26 confluent-kafka-python backed VerifiableProducer class for use with 27 Kafka's kafkatests client tests. 28 """ 29 def __init__(self, conf): 30 """ 31 conf is a config dict passed to confluent_kafka.Producer() 32 """ 33 super(VerifiableProducer, self).__init__(conf) 34 self.conf['on_delivery'] = self.dr_cb 35 self.producer = Producer(**self.conf) 36 self.num_acked = 0 37 self.num_sent = 0 38 self.num_err = 0 39 40 def dr_cb(self, err, msg): 41 """ Per-message Delivery report callback. Called from poll() """ 42 if err: 43 self.num_err += 1 44 self.send({'name': 'producer_send_error', 45 'message': str(err), 46 'topic': msg.topic(), 47 'key': msg.key(), 48 'value': msg.value()}) 49 else: 50 self.num_acked += 1 51 self.send({'name': 'producer_send_success', 52 'topic': msg.topic(), 53 'partition': msg.partition(), 54 'offset': msg.offset(), 55 'key': msg.key(), 56 'value': msg.value()}) 57 58 pass 59 60 61if __name__ == '__main__': 62 63 parser = argparse.ArgumentParser(description='Verifiable Python Producer') 64 parser.add_argument('--topic', type=str, required=True) 65 parser.add_argument('--throughput', type=int, default=0) 66 parser.add_argument('--broker-list', dest='conf_bootstrap.servers', required=True) 67 parser.add_argument('--max-messages', type=int, dest='max_msgs', default=1000000) # avoid infinite 68 parser.add_argument('--value-prefix', dest='value_prefix', type=str, default=None) 69 parser.add_argument('--acks', type=int, dest='topicconf_request.required.acks', default=-1) 70 parser.add_argument('--message-create-time', type=int, dest='create_time', default=0) 71 parser.add_argument('--producer.config', dest='producer_config') 72 parser.add_argument('-X', nargs=1, dest='extra_conf', action='append', help='Configuration property', default=[]) 73 args = vars(parser.parse_args()) 74 75 conf = {'broker.version.fallback': '0.9.0', 76 'produce.offset.report': True} 77 78 if args.get('producer_config', None) is not None: 79 args.update(VerifiableClient.read_config_file(args['producer_config'])) 80 81 args.update([x[0].split('=') for x in args.get('extra_conf', [])]) 82 83 VerifiableClient.set_config(conf, args) 84 85 vp = VerifiableProducer(conf) 86 87 vp.max_msgs = args['max_msgs'] 88 throughput = args['throughput'] 89 topic = args['topic'] 90 if args['value_prefix'] is not None: 91 value_fmt = args['value_prefix'] + '.%d' 92 else: 93 value_fmt = '%d' 94 95 if throughput > 0: 96 delay = 1.0/throughput 97 else: 98 delay = 0 99 100 vp.dbg('Producing %d messages at a rate of %d/s' % (vp.max_msgs, throughput)) 101 102 try: 103 for i in range(0, vp.max_msgs): 104 if not vp.run: 105 break 106 107 t_end = time.time() + delay 108 while vp.run: 109 try: 110 vp.producer.produce(topic, value=(value_fmt % i), 111 timestamp=args.get('create_time', 0)) 112 vp.num_sent += 1 113 except KafkaException as e: 114 vp.err('produce() #%d/%d failed: %s' % (i, vp.max_msgs, str(e))) 115 vp.num_err += 1 116 except BufferError: 117 vp.dbg('Local produce queue full (produced %d/%d msgs), waiting for deliveries..' % 118 (i, vp.max_msgs)) 119 vp.producer.poll(timeout=0.5) 120 continue 121 break 122 123 # Delay to achieve desired throughput, 124 # but make sure poll is called at least once 125 # to serve DRs. 126 while True: 127 remaining = max(0, t_end - time.time()) 128 vp.producer.poll(timeout=remaining) 129 if remaining <= 0.00000001: 130 break 131 132 except KeyboardInterrupt: 133 pass 134 135 # Flush remaining messages to broker. 136 vp.dbg('Flushing') 137 try: 138 vp.producer.flush(5) 139 except KeyboardInterrupt: 140 pass 141 142 vp.send({'name': 'shutdown_complete', '_qlen': len(vp.producer)}) 143 144 vp.dbg('All done') 145