1#!/usr/bin/env python3
2#    Licensed under the Apache License, Version 2.0 (the "License"); you may
3#    not use this file except in compliance with the License. You may obtain
4#    a copy of the License at
5#
6#         http://www.apache.org/licenses/LICENSE-2.0
7#
8#    Unless required by applicable law or agreed to in writing, software
9#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
10#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
11#    License for the specific language governing permissions and limitations
12#    under the License.
13
14try:
15    # Avoid https://github.com/PyCQA/pycodestyle/issues/472
16    import eventlet
17    eventlet.monkey_patch()
18except ImportError:
19    raise
20
21import argparse
22import bisect
23import collections
24import functools
25import itertools
26import json
27import logging
28import os
29import random
30import signal
31import socket
32import string
33import sys
34import threading
35import time
36import yaml
37
38from oslo_config import cfg
39import oslo_messaging as messaging
40from oslo_messaging import notify  # noqa
41from oslo_messaging import rpc  # noqa
42from oslo_utils import timeutils
43
44LOG = logging.getLogger()
45CURRENT_PID = None
46CURRENT_HOST = None
47CLIENTS = []
48MESSAGES = []
49IS_RUNNING = True
50SERVERS = []
51TRANSPORT = None
52
53USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\
54 {notify-server,notify-client,rpc-server,rpc-client} ...
55
56Usage example:
57 python tools/simulator.py\
58 --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-server
59 python tools/simulator.py\
60 --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-client\
61 --exit-wait 15000 -p 64 -m 64"""
62
63MESSAGES_LIMIT = 1000
64DISTRIBUTION_BUCKET_SIZE = 500
65
66
67def init_random_generator():
68    data = []
69    file_dir = os.path.dirname(os.path.abspath(__file__))
70    with open(os.path.join(file_dir, 'messages_length.yaml')) as m_file:
71        content = yaml.safe_load(m_file)
72        data += [int(n) for n in content[
73            'test_data']['string_lengths'].split(', ')]
74
75    ranges = collections.defaultdict(int)
76    for msg_length in data:
77        range_start = ((msg_length // DISTRIBUTION_BUCKET_SIZE) *
78                       DISTRIBUTION_BUCKET_SIZE + 1)
79        ranges[range_start] += 1
80
81    ranges_start = sorted(ranges.keys())
82    total_count = len(data)
83
84    accumulated_distribution = []
85    running_total = 0
86    for range_start in ranges_start:
87        norm = float(ranges[range_start]) / total_count
88        running_total += norm
89        accumulated_distribution.append(running_total)
90
91    def weighted_random_choice():
92        r = random.random() * running_total
93        start = ranges_start[bisect.bisect_right(accumulated_distribution, r)]
94        return random.randrange(start, start + DISTRIBUTION_BUCKET_SIZE)
95
96    return weighted_random_choice
97
98
99class LoggingNoParsingFilter(logging.Filter):
100    def filter(self, record):
101        msg = record.getMessage()
102        for i in ['received {', 'MSG_ID is ']:
103            if i in msg:
104                return False
105        return True
106
107
108Message = collections.namedtuple(
109    'Message', ['seq', 'cargo', 'client_ts', 'server_ts', 'return_ts'])
110
111
112def make_message(seq, cargo, client_ts=0, server_ts=0, return_ts=0):
113    return Message(seq, cargo, client_ts, server_ts, return_ts)
114
115
116def update_message(message, **kwargs):
117    return Message(*message)._replace(**kwargs)
118
119
120class MessageStatsCollector(object):
121    def __init__(self, label):
122        self.label = label
123        self.buffer = []  # buffer to store messages during report interval
124        self.series = []  # stats for every report interval
125
126        now = time.time()
127        diff = int(now) - now + 1  # align start to whole seconds
128        threading.Timer(diff, self.monitor).start()  # schedule in a second
129
130    def monitor(self):
131        global IS_RUNNING
132        if IS_RUNNING:
133            # NOTE(kbespalov): this way not properly works
134            # because the monitor starting with range 1sec +-150 ms
135            # due to high threading contention between rpc clients
136            threading.Timer(1.0, self.monitor).start()
137        now = time.time()
138
139        count = len(self.buffer)
140
141        size = 0
142        min_latency = sys.maxsize
143        max_latency = 0
144        sum_latencies = 0
145
146        for i in range(count):
147            p = self.buffer[i]
148            size += len(p.cargo)
149
150            latency = None
151            if p.return_ts:
152                latency = p.return_ts - p.client_ts  # round-trip
153            elif p.server_ts:
154                latency = p.server_ts - p.client_ts  # client -> server
155
156            if latency:
157                sum_latencies += latency
158                min_latency = min(min_latency, latency)
159                max_latency = max(max_latency, latency)
160
161        del self.buffer[:count]  # trim processed items
162
163        seq = len(self.series)
164        stats = dict(seq=seq, timestamp=now, count=count, size=size)
165        msg = ('%-14s: seq: %-4d count: %-6d bytes: %-10d' %
166               (self.label, seq, count, size))
167
168        if sum_latencies:
169            latency = sum_latencies / count
170            stats.update(dict(latency=latency,
171                              min_latency=min_latency,
172                              max_latency=max_latency))
173            msg += (' latency: %-9.3f min: %-9.3f max: %-9.3f' %
174                    (latency, min_latency, max_latency))
175
176        self.series.append(stats)
177        LOG.info(msg)
178
179    def push(self, parsed_message):
180        self.buffer.append(parsed_message)
181
182    def get_series(self):
183        return self.series
184
185    @staticmethod
186    def calc_stats(label, *collectors):
187        count = 0
188        size = 0
189        min_latency = sys.maxsize
190        max_latency = 0
191        sum_latencies = 0
192        start = sys.maxsize
193        end = 0
194
195        for point in itertools.chain(*(c.get_series() for c in collectors)):
196            count += point['count']
197            size += point['size']
198            if point['count']:
199                # NOTE(kbespalov):
200                # we except the start and end time as time of
201                # first and last processed message, no reason
202                # to set boundaries if server was idle before
203                # running of clients and after.
204                start = min(start, point['timestamp'])
205                end = max(end, point['timestamp'])
206
207            if 'latency' in point:
208                sum_latencies += point['latency'] * point['count']
209                min_latency = min(min_latency, point['min_latency'])
210                max_latency = max(max_latency, point['max_latency'])
211
212        # start is the timestamp of the earliest block, which inclides samples
213        # for the prior second
214        start -= 1
215        duration = end - start if count else 0
216        stats = dict(count=count, size=size, duration=duration, count_p_s=0,
217                     size_p_s=0)
218        if duration:
219            stats.update(dict(start=start, end=end,
220                              count_p_s=count / duration,
221                              size_p_s=size / duration))
222
223        msg = ('%s: duration: %.2f count: %d (%.1f msg/sec) '
224               'bytes: %d (%.0f bps)' %
225               (label, duration, count, stats['count_p_s'],
226                size, stats['size_p_s']))
227
228        if sum_latencies:
229            latency = sum_latencies / count
230            stats.update(dict(latency=latency,
231                              min_latency=min_latency,
232                              max_latency=max_latency))
233            msg += (' latency: %.3f min: %.3f max: %.3f' %
234                    (latency, min_latency, max_latency))
235
236        LOG.info(msg)
237        return stats
238
239
240class NotifyEndpoint(object):
241    def __init__(self, wait_before_answer, requeue):
242        self.wait_before_answer = wait_before_answer
243        self.requeue = requeue
244        self.received_messages = MessageStatsCollector('server')
245        self.cache = set()
246
247    def info(self, ctxt, publisher_id, event_type, payload, metadata):
248        LOG.debug("%s %s %s %s", ctxt, publisher_id, event_type, payload)
249
250        server_ts = time.time()
251
252        message = update_message(payload, server_ts=server_ts)
253        self.received_messages.push(message)
254
255        if self.requeue and message.seq not in self.cache:
256            self.cache.add(message.seq)
257
258            if self.wait_before_answer > 0:
259                time.sleep(self.wait_before_answer)
260
261            return messaging.NotificationResult.REQUEUE
262
263        return messaging.NotificationResult.HANDLED
264
265
266def notify_server(transport, topic, wait_before_answer, duration, requeue):
267    endpoints = [NotifyEndpoint(wait_before_answer, requeue)]
268    target = messaging.Target(topic=topic)
269    server = notify.get_notification_listener(transport, [target],
270                                              endpoints, executor='eventlet')
271    run_server(server, duration=duration)
272
273    return endpoints[0]
274
275
276class BatchNotifyEndpoint(object):
277    def __init__(self, wait_before_answer, requeue):
278        self.wait_before_answer = wait_before_answer
279        self.requeue = requeue
280        self.received_messages = MessageStatsCollector('server')
281        self.cache = set()
282
283    def info(self, batch):
284        LOG.debug('msg rcv')
285        LOG.debug("%s", batch)
286
287        server_ts = time.time()
288
289        for item in batch:
290            message = update_message(item['payload'], server_ts=server_ts)
291            self.received_messages.push(message)
292
293        return messaging.NotificationResult.HANDLED
294
295
296def batch_notify_server(transport, topic, wait_before_answer, duration,
297                        requeue):
298    endpoints = [BatchNotifyEndpoint(wait_before_answer, requeue)]
299    target = messaging.Target(topic=topic)
300    server = notify.get_batch_notification_listener(
301        transport, [target],
302        endpoints, executor='eventlet',
303        batch_size=1000, batch_timeout=5)
304    run_server(server, duration=duration)
305
306    return endpoints[0]
307
308
309class RpcEndpoint(object):
310    def __init__(self, wait_before_answer):
311        self.wait_before_answer = wait_before_answer
312        self.received_messages = MessageStatsCollector('server')
313
314    def info(self, ctxt, message):
315        server_ts = time.time()
316
317        LOG.debug("######## RCV: %s", message)
318
319        reply = update_message(message, server_ts=server_ts)
320        self.received_messages.push(reply)
321
322        if self.wait_before_answer > 0:
323            time.sleep(self.wait_before_answer)
324
325        return reply
326
327
328class ServerControlEndpoint(object):
329    def __init__(self, controlled_server):
330        self.connected_clients = set()
331        self.controlled_server = controlled_server
332
333    def sync_start(self, ctx, message):
334        """Handle start reports from clients"""
335
336        client_id = message['id']
337        LOG.info('The client %s started to send messages' % client_id)
338        self.connected_clients.add(client_id)
339
340    def sync_done(self, ctx, message):
341        """Handle done reports from clients"""
342
343        client_id = message['id']
344        LOG.info('The client %s finished msg sending.' % client_id)
345
346        if client_id in self.connected_clients:
347            self.connected_clients.remove(client_id)
348
349        if not self.connected_clients:
350            LOG.info(
351                'The clients sent all messages. Shutting down the server..')
352            threading.Timer(1, self._stop_server_with_delay).start()
353
354    def _stop_server_with_delay(self):
355        self.controlled_server.stop()
356        self.controlled_server.wait()
357
358
359class Client(object):
360    def __init__(self, client_id, client, method, has_result,
361                 wait_after_msg):
362        self.client_id = client_id
363        self.client = client
364        self.method = method
365        self.wait_after_msg = wait_after_msg
366
367        self.seq = 0
368        self.messages_count = len(MESSAGES)
369        # Start sending the messages from a random position to avoid
370        # memory re-usage and generate more realistic load on the library
371        # and a message transport
372        self.position = random.randint(0, self.messages_count - 1)
373        self.sent_messages = MessageStatsCollector('client-%s' % client_id)
374        self.errors = MessageStatsCollector('error-%s' % client_id)
375
376        if has_result:
377            self.round_trip_messages = MessageStatsCollector(
378                'round-trip-%s' % client_id)
379
380    def host_based_id(self):
381        _id = "%(client_id)s %(salt)s@%(hostname)s"
382        return _id % {'hostname': CURRENT_HOST,
383                      'salt': hex(id(self))[2:],
384                      'client_id': self.client_id}
385
386    def send_msg(self):
387        msg = make_message(self.seq, MESSAGES[self.position], time.time())
388        self.sent_messages.push(msg)
389
390        res = None
391        try:
392            res = self.method(self.client, msg)
393        except Exception:
394            self.errors.push(msg)
395        else:
396            LOG.debug("SENT: %s", msg)
397
398        if res:
399            return_ts = time.time()
400            res = update_message(res, return_ts=return_ts)
401            self.round_trip_messages.push(res)
402
403        self.seq += 1
404        self.position = (self.position + 1) % self.messages_count
405        if self.wait_after_msg > 0:
406            time.sleep(self.wait_after_msg)
407
408
409class RPCClient(Client):
410    def __init__(self, client_id, transport, target, timeout, is_cast,
411                 wait_after_msg, sync_mode=False):
412
413        client = rpc.RPCClient(transport, target)
414        method = _rpc_cast if is_cast else _rpc_call
415
416        super(RPCClient, self).__init__(client_id,
417                                        client.prepare(timeout=timeout),
418                                        method,
419                                        not is_cast, wait_after_msg)
420        self.sync_mode = sync_mode
421        self.is_sync = False
422
423        # prepare the sync client
424        if sync_mode:
425            if sync_mode == 'call':
426                self.sync_client = self.client
427            else:
428                self.sync_client = client.prepare(fanout=True, timeout=timeout)
429
430    def send_msg(self):
431        if self.sync_mode and not self.is_sync:
432            self.is_sync = self.sync_start()
433        super(RPCClient, self).send_msg()
434
435    def sync_start(self):
436        try:
437            msg = {'id': self.host_based_id()}
438            method = _rpc_call if self.sync_mode == 'call' else _rpc_cast
439            method(self.sync_client, msg, 'sync_start')
440        except Exception:
441            LOG.error('The client: %s failed to sync with %s.' %
442                      (self.client_id, self.client.target))
443            return False
444        LOG.info('The client: %s successfully sync with  %s' % (
445            self.client_id, self.client.target))
446        return True
447
448    def sync_done(self):
449        try:
450            msg = {'id': self.host_based_id()}
451            method = _rpc_call if self.sync_mode == 'call' else _rpc_cast
452            method(self.sync_client, msg, 'sync_done')
453        except Exception:
454            LOG.error('The client: %s failed finish the sync with %s.'
455                      % (self.client_id, self.client.target))
456            return False
457        LOG.info('The client: %s successfully finished sync with %s'
458                 % (self.client_id, self.client.target))
459        return True
460
461
462class NotifyClient(Client):
463    def __init__(self, client_id, transport, topic, wait_after_msg):
464        client = notify.Notifier(transport, driver='messaging', topics=topic)
465        client = client.prepare(publisher_id='publisher-%d' % client_id)
466        method = _notify
467        super(NotifyClient, self).__init__(client_id, client, method,
468                                           False, wait_after_msg)
469
470
471def generate_messages(messages_count):
472    # Limit the messages amount. Clients will reiterate the array again
473    # if an amount of messages to be sent is bigger than MESSAGES_LIMIT
474    if messages_count > MESSAGES_LIMIT:
475        messages_count = MESSAGES_LIMIT
476    LOG.info("Generating %d random messages", messages_count)
477    generator = init_random_generator()
478    for i in range(messages_count):
479        length = generator()
480        msg = ''.join(random.choice(
481                      string.ascii_lowercase) for x in range(length))
482        MESSAGES.append(msg)
483
484    LOG.info("Messages has been prepared")
485
486
487def wrap_sigexit(f):
488    def inner(*args, **kwargs):
489        try:
490            return f(*args, **kwargs)
491        except SignalExit as e:
492            LOG.info('Signal %s is caught. Interrupting the execution',
493                     e.signo)
494            for server in SERVERS:
495                server.stop()
496                server.wait()
497        finally:
498            if TRANSPORT:
499                TRANSPORT.cleanup()
500    return inner
501
502
503@wrap_sigexit
504def run_server(server, duration=None):
505    global IS_RUNNING
506    SERVERS.append(server)
507    server.start()
508    if duration:
509        with timeutils.StopWatch(duration) as stop_watch:
510            while not stop_watch.expired() and IS_RUNNING:
511                time.sleep(1)
512        server.stop()
513        IS_RUNNING = False
514    server.wait()
515    LOG.info('The server is terminating')
516    time.sleep(1)  # wait for stats collector to process the last second
517
518
519def rpc_server(transport, target, wait_before_answer, executor, duration):
520
521    endpoints = [RpcEndpoint(wait_before_answer)]
522    server = rpc.get_rpc_server(transport, target, endpoints, executor)
523
524    # make the rpc server controllable by rpc clients
525    endpoints.append(ServerControlEndpoint(server))
526
527    LOG.debug("starting RPC server for target %s", target)
528
529    run_server(server, duration=duration)
530
531    return server.dispatcher.endpoints[0]
532
533
534@wrap_sigexit
535def spawn_rpc_clients(threads, transport, targets, wait_after_msg, timeout,
536                      is_cast, messages_count, duration, sync_mode):
537    p = eventlet.GreenPool(size=threads)
538    targets = itertools.cycle(targets)
539
540    for i in range(threads):
541        target = next(targets)
542        LOG.debug("starting RPC client for target %s", target)
543        client_builder = functools.partial(RPCClient, i, transport, target,
544                                           timeout, is_cast, wait_after_msg,
545                                           sync_mode)
546        p.spawn_n(send_messages, i, client_builder,
547                  messages_count, duration)
548    p.waitall()
549
550
551@wrap_sigexit
552def spawn_notify_clients(threads, topic, transport, message_count,
553                         wait_after_msg, timeout, duration):
554    p = eventlet.GreenPool(size=threads)
555    for i in range(threads):
556        client_builder = functools.partial(NotifyClient, i, transport, [topic],
557                                           wait_after_msg)
558        p.spawn_n(send_messages, i, client_builder, message_count, duration)
559    p.waitall()
560
561
562def send_messages(client_id, client_builder, messages_count, duration):
563    global IS_RUNNING
564    client = client_builder()
565    CLIENTS.append(client)
566
567    # align message sending closer to whole seconds
568    now = time.time()
569    diff = int(now) - now + 1
570    time.sleep(diff)
571
572    if duration:
573        with timeutils.StopWatch(duration) as stop_watch:
574            while not stop_watch.expired() and IS_RUNNING:
575                client.send_msg()
576                eventlet.sleep()
577        IS_RUNNING = False
578    else:
579        LOG.debug("Sending %d messages using client %d",
580                  messages_count, client_id)
581        for _ in range(messages_count):
582            client.send_msg()
583            eventlet.sleep()
584            if not IS_RUNNING:
585                break
586        LOG.debug("Client %d has sent %d messages", client_id, messages_count)
587
588    # wait for replies to be collected
589    time.sleep(1)
590
591    # send stop request to the rpc server
592    if isinstance(client, RPCClient) and client.is_sync:
593        client.sync_done()
594
595
596def _rpc_call(client, msg, remote_method='info'):
597    try:
598        res = client.call({}, remote_method, message=msg)
599    except Exception as e:
600        LOG.exception('Error %s on CALL for message %s', str(e), msg)
601        raise
602    else:
603        LOG.debug("SENT: %s, RCV: %s", msg, res)
604        return res
605
606
607def _rpc_cast(client, msg, remote_method='info'):
608    try:
609        client.cast({}, remote_method, message=msg)
610    except Exception as e:
611        LOG.exception('Error %s on CAST for message %s', str(e), msg)
612        raise
613    else:
614        LOG.debug("SENT: %s", msg)
615
616
617def _notify(notification_client, msg):
618    notification_client.info({}, 'compute.start', msg)
619
620
621def show_server_stats(endpoint, json_filename):
622    LOG.info('=' * 35 + ' summary ' + '=' * 35)
623    output = dict(series={}, summary={})
624    output['series']['server'] = endpoint.received_messages.get_series()
625    stats = MessageStatsCollector.calc_stats(
626        'server', endpoint.received_messages)
627    output['summary'] = stats
628
629    if json_filename:
630        write_json_file(json_filename, output)
631
632
633def show_client_stats(clients, json_filename, has_reply=False):
634    LOG.info('=' * 35 + ' summary ' + '=' * 35)
635    output = dict(series={}, summary={})
636
637    for cl in clients:
638        cl_id = cl.client_id
639        output['series']['client_%s' % cl_id] = cl.sent_messages.get_series()
640        output['series']['error_%s' % cl_id] = cl.errors.get_series()
641
642        if has_reply:
643            output['series']['round_trip_%s' % cl_id] = (
644                cl.round_trip_messages.get_series())
645
646    sent_stats = MessageStatsCollector.calc_stats(
647        'client', *(cl.sent_messages for cl in clients))
648    output['summary']['client'] = sent_stats
649
650    error_stats = MessageStatsCollector.calc_stats(
651        'error', *(cl.errors for cl in clients))
652    output['summary']['error'] = error_stats
653
654    if has_reply:
655        round_trip_stats = MessageStatsCollector.calc_stats(
656            'round-trip', *(cl.round_trip_messages for cl in clients))
657        output['summary']['round_trip'] = round_trip_stats
658
659    if json_filename:
660        write_json_file(json_filename, output)
661
662
663def write_json_file(filename, output):
664    with open(filename, 'w') as f:
665        f.write(json.dumps(output))
666        LOG.info('Stats are written into %s', filename)
667
668
669class SignalExit(SystemExit):
670    def __init__(self, signo, exccode=1):
671        super(SignalExit, self).__init__(exccode)
672        self.signo = signo
673
674
675def signal_handler(signum, frame):
676    global IS_RUNNING
677    IS_RUNNING = False
678
679    raise SignalExit(signum)
680
681
682def _setup_logging(is_debug):
683    log_level = logging.DEBUG if is_debug else logging.INFO
684    logging.basicConfig(
685        stream=sys.stdout, level=log_level,
686        format="%(asctime)-15s %(levelname)s %(name)s %(message)s")
687    logging.getLogger().handlers[0].addFilter(LoggingNoParsingFilter())
688    for i in ['kombu', 'amqp', 'stevedore', 'qpid.messaging'
689              'oslo.messaging._drivers.amqp', ]:
690        logging.getLogger(i).setLevel(logging.WARN)
691
692
693def main():
694    parser = argparse.ArgumentParser(
695        description='Tools to play with oslo.messaging\'s RPC',
696        usage=USAGE,
697    )
698    parser.add_argument('--url', dest='url',
699                        help="oslo.messaging transport url")
700    parser.add_argument('-d', '--debug', dest='debug', action='store_true',
701                        help="Turn on DEBUG logging level instead of WARN")
702    parser.add_argument('-tp', '--topic', dest='topic',
703                        default="profiler_topic",
704                        help="Topics to publish/receive messages to/from.")
705    parser.add_argument('-s', '--server', dest='server',
706                        default="profiler_server",
707                        help="Servers to publish/receive messages to/from.")
708    parser.add_argument('-tg', '--targets', dest='targets', nargs="+",
709                        default=["profiler_topic.profiler_server"],
710                        help="Targets to publish/receive messages to/from.")
711    parser.add_argument('-l', dest='duration', type=int,
712                        help='send messages for certain time')
713    parser.add_argument('-j', '--json', dest='json_filename',
714                        help='File name to store results in JSON format')
715    parser.add_argument('--config-file', dest='config_file', type=str,
716                        help="Oslo messaging config file")
717
718    subparsers = parser.add_subparsers(dest='mode',
719                                       help='notify/rpc server/client mode')
720
721    server = subparsers.add_parser('notify-server')
722    server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
723    server.add_argument('--requeue', dest='requeue', action='store_true')
724
725    server = subparsers.add_parser('batch-notify-server')
726    server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
727    server.add_argument('--requeue', dest='requeue', action='store_true')
728
729    client = subparsers.add_parser('notify-client')
730    client.add_argument('-p', dest='threads', type=int, default=1,
731                        help='number of client threads')
732    client.add_argument('-m', dest='messages', type=int, default=1,
733                        help='number of call per threads')
734    client.add_argument('-w', dest='wait_after_msg', type=float, default=-1,
735                        help='sleep time between two messages')
736    client.add_argument('--timeout', dest='timeout', type=int, default=3,
737                        help='client timeout')
738
739    server = subparsers.add_parser('rpc-server')
740    server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
741    server.add_argument('-e', '--executor', dest='executor',
742                        type=str, default='eventlet',
743                        help='name of a message executor')
744
745    client = subparsers.add_parser('rpc-client')
746    client.add_argument('-p', dest='threads', type=int, default=1,
747                        help='number of client threads')
748    client.add_argument('-m', dest='messages', type=int, default=1,
749                        help='number of call per threads')
750    client.add_argument('-w', dest='wait_after_msg', type=float, default=-1,
751                        help='sleep time between two messages')
752    client.add_argument('--timeout', dest='timeout', type=int, default=3,
753                        help='client timeout')
754    client.add_argument('--exit-wait', dest='exit_wait', type=int, default=0,
755                        help='Keep connections open N seconds after calls '
756                        'have been done')
757    client.add_argument('--is-cast', dest='is_cast', action='store_true',
758                        help='Use `call` or `cast` RPC methods')
759    client.add_argument('--is-fanout', dest='is_fanout', action='store_true',
760                        help='fanout=True for CAST messages')
761
762    client.add_argument('--sync', dest='sync', choices=('call', 'fanout'),
763                        help="stop server when all msg was sent by clients")
764
765    args = parser.parse_args()
766
767    _setup_logging(is_debug=args.debug)
768
769    if args.config_file:
770        cfg.CONF(["--config-file", args.config_file])
771
772    global TRANSPORT
773    if args.mode in ['rpc-server', 'rpc-client']:
774        TRANSPORT = messaging.get_transport(cfg.CONF, url=args.url)
775    else:
776        TRANSPORT = messaging.get_notification_transport(cfg.CONF,
777                                                         url=args.url)
778
779    if args.mode in ['rpc-client', 'notify-client']:
780        # always generate maximum number of messages for duration-limited tests
781        generate_messages(MESSAGES_LIMIT if args.duration else args.messages)
782
783    # oslo.config defaults
784    cfg.CONF.heartbeat_interval = 5
785    cfg.CONF.prog = os.path.basename(__file__)
786    cfg.CONF.project = 'oslo.messaging'
787
788    signal.signal(signal.SIGTERM, signal_handler)
789    signal.signal(signal.SIGINT, signal_handler)
790
791    if args.mode == 'rpc-server':
792        target = messaging.Target(topic=args.topic, server=args.server)
793        endpoint = rpc_server(TRANSPORT, target, args.wait_before_answer,
794                              args.executor, args.duration)
795        show_server_stats(endpoint, args.json_filename)
796
797    elif args.mode == 'notify-server':
798        endpoint = notify_server(TRANSPORT, args.topic,
799                                 args.wait_before_answer, args.duration,
800                                 args.requeue)
801        show_server_stats(endpoint, args.json_filename)
802
803    elif args.mode == 'batch-notify-server':
804        endpoint = batch_notify_server(TRANSPORT, args.topic,
805                                       args.wait_before_answer,
806                                       args.duration, args.requeue)
807        show_server_stats(endpoint, args.json_filename)
808
809    elif args.mode == 'notify-client':
810        spawn_notify_clients(args.threads, args.topic, TRANSPORT,
811                             args.messages, args.wait_after_msg,
812                             args.timeout, args.duration)
813        show_client_stats(CLIENTS, args.json_filename)
814
815    elif args.mode == 'rpc-client':
816
817        targets = []
818        for target in args.targets:
819            tp, srv = target.partition('.')[::2]
820            t = messaging.Target(topic=tp, server=srv, fanout=args.is_fanout)
821            targets.append(t)
822
823        spawn_rpc_clients(args.threads, TRANSPORT, targets,
824                          args.wait_after_msg, args.timeout, args.is_cast,
825                          args.messages, args.duration, args.sync)
826        show_client_stats(CLIENTS, args.json_filename, not args.is_cast)
827
828        if args.exit_wait:
829            LOG.info("Finished. waiting for %d seconds", args.exit_wait)
830            time.sleep(args.exit_wait)
831
832
833if __name__ == '__main__':
834    CURRENT_PID = os.getpid()
835    CURRENT_HOST = socket.gethostname()
836    main()
837