1#!/usr/bin/env python
2#
3# Copyright 2016 Confluent Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import argparse
19import time
20from confluent_kafka import Producer, KafkaException
21from verifiable_client import VerifiableClient
22
23
24class VerifiableProducer(VerifiableClient):
25    """
26    confluent-kafka-python backed VerifiableProducer class for use with
27    Kafka's kafkatests client tests.
28    """
29    def __init__(self, conf):
30        """
31        conf is a config dict passed to confluent_kafka.Producer()
32        """
33        super(VerifiableProducer, self).__init__(conf)
34        self.conf['on_delivery'] = self.dr_cb
35        self.producer = Producer(**self.conf)
36        self.num_acked = 0
37        self.num_sent = 0
38        self.num_err = 0
39
40    def dr_cb(self, err, msg):
41        """ Per-message Delivery report callback. Called from poll() """
42        if err:
43            self.num_err += 1
44            self.send({'name': 'producer_send_error',
45                       'message': str(err),
46                       'topic': msg.topic(),
47                       'key': msg.key(),
48                       'value': msg.value()})
49        else:
50            self.num_acked += 1
51            self.send({'name': 'producer_send_success',
52                       'topic': msg.topic(),
53                       'partition': msg.partition(),
54                       'offset': msg.offset(),
55                       'key': msg.key(),
56                       'value': msg.value()})
57
58        pass
59
60
61if __name__ == '__main__':
62
63    parser = argparse.ArgumentParser(description='Verifiable Python Producer')
64    parser.add_argument('--topic', type=str, required=True)
65    parser.add_argument('--throughput', type=int, default=0)
66    parser.add_argument('--broker-list', dest='conf_bootstrap.servers', required=True)
67    parser.add_argument('--max-messages', type=int, dest='max_msgs', default=1000000)  # avoid infinite
68    parser.add_argument('--value-prefix', dest='value_prefix', type=str, default=None)
69    parser.add_argument('--acks', type=int, dest='topicconf_request.required.acks', default=-1)
70    parser.add_argument('--message-create-time', type=int, dest='create_time', default=0)
71    parser.add_argument('--producer.config', dest='producer_config')
72    parser.add_argument('-X', nargs=1, dest='extra_conf', action='append', help='Configuration property', default=[])
73    args = vars(parser.parse_args())
74
75    conf = {'broker.version.fallback': '0.9.0',
76            'produce.offset.report': True}
77
78    if args.get('producer_config', None) is not None:
79        args.update(VerifiableClient.read_config_file(args['producer_config']))
80
81    args.update([x[0].split('=') for x in args.get('extra_conf', [])])
82
83    VerifiableClient.set_config(conf, args)
84
85    vp = VerifiableProducer(conf)
86
87    vp.max_msgs = args['max_msgs']
88    throughput = args['throughput']
89    topic = args['topic']
90    if args['value_prefix'] is not None:
91        value_fmt = args['value_prefix'] + '.%d'
92    else:
93        value_fmt = '%d'
94
95    if throughput > 0:
96        delay = 1.0/throughput
97    else:
98        delay = 0
99
100    vp.dbg('Producing %d messages at a rate of %d/s' % (vp.max_msgs, throughput))
101
102    try:
103        for i in range(0, vp.max_msgs):
104            if not vp.run:
105                break
106
107            t_end = time.time() + delay
108            while vp.run:
109                try:
110                    vp.producer.produce(topic, value=(value_fmt % i),
111                                        timestamp=args.get('create_time', 0))
112                    vp.num_sent += 1
113                except KafkaException as e:
114                    vp.err('produce() #%d/%d failed: %s' % (i, vp.max_msgs, str(e)))
115                    vp.num_err += 1
116                except BufferError:
117                    vp.dbg('Local produce queue full (produced %d/%d msgs), waiting for deliveries..' %
118                           (i, vp.max_msgs))
119                    vp.producer.poll(timeout=0.5)
120                    continue
121                break
122
123            # Delay to achieve desired throughput,
124            # but make sure poll is called at least once
125            # to serve DRs.
126            while True:
127                remaining = max(0, t_end - time.time())
128                vp.producer.poll(timeout=remaining)
129                if remaining <= 0.00000001:
130                    break
131
132    except KeyboardInterrupt:
133        pass
134
135    # Flush remaining messages to broker.
136    vp.dbg('Flushing')
137    try:
138        vp.producer.flush(5)
139    except KeyboardInterrupt:
140        pass
141
142    vp.send({'name': 'shutdown_complete', '_qlen': len(vp.producer)})
143
144    vp.dbg('All done')
145