1#!/usr/bin/env python3 2# Licensed under the Apache License, Version 2.0 (the "License"); you may 3# not use this file except in compliance with the License. You may obtain 4# a copy of the License at 5# 6# http://www.apache.org/licenses/LICENSE-2.0 7# 8# Unless required by applicable law or agreed to in writing, software 9# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 10# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 11# License for the specific language governing permissions and limitations 12# under the License. 13 14try: 15 # Avoid https://github.com/PyCQA/pycodestyle/issues/472 16 import eventlet 17 eventlet.monkey_patch() 18except ImportError: 19 raise 20 21import argparse 22import bisect 23import collections 24import functools 25import itertools 26import json 27import logging 28import os 29import random 30import signal 31import socket 32import string 33import sys 34import threading 35import time 36import yaml 37 38from oslo_config import cfg 39import oslo_messaging as messaging 40from oslo_messaging import notify # noqa 41from oslo_messaging import rpc # noqa 42from oslo_utils import timeutils 43 44LOG = logging.getLogger() 45CURRENT_PID = None 46CURRENT_HOST = None 47CLIENTS = [] 48MESSAGES = [] 49IS_RUNNING = True 50SERVERS = [] 51TRANSPORT = None 52 53USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\ 54 {notify-server,notify-client,rpc-server,rpc-client} ... 55 56Usage example: 57 python tools/simulator.py\ 58 --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-server 59 python tools/simulator.py\ 60 --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-client\ 61 --exit-wait 15000 -p 64 -m 64""" 62 63MESSAGES_LIMIT = 1000 64DISTRIBUTION_BUCKET_SIZE = 500 65 66 67def init_random_generator(): 68 data = [] 69 file_dir = os.path.dirname(os.path.abspath(__file__)) 70 with open(os.path.join(file_dir, 'messages_length.yaml')) as m_file: 71 content = yaml.safe_load(m_file) 72 data += [int(n) for n in content[ 73 'test_data']['string_lengths'].split(', ')] 74 75 ranges = collections.defaultdict(int) 76 for msg_length in data: 77 range_start = ((msg_length // DISTRIBUTION_BUCKET_SIZE) * 78 DISTRIBUTION_BUCKET_SIZE + 1) 79 ranges[range_start] += 1 80 81 ranges_start = sorted(ranges.keys()) 82 total_count = len(data) 83 84 accumulated_distribution = [] 85 running_total = 0 86 for range_start in ranges_start: 87 norm = float(ranges[range_start]) / total_count 88 running_total += norm 89 accumulated_distribution.append(running_total) 90 91 def weighted_random_choice(): 92 r = random.random() * running_total 93 start = ranges_start[bisect.bisect_right(accumulated_distribution, r)] 94 return random.randrange(start, start + DISTRIBUTION_BUCKET_SIZE) 95 96 return weighted_random_choice 97 98 99class LoggingNoParsingFilter(logging.Filter): 100 def filter(self, record): 101 msg = record.getMessage() 102 for i in ['received {', 'MSG_ID is ']: 103 if i in msg: 104 return False 105 return True 106 107 108Message = collections.namedtuple( 109 'Message', ['seq', 'cargo', 'client_ts', 'server_ts', 'return_ts']) 110 111 112def make_message(seq, cargo, client_ts=0, server_ts=0, return_ts=0): 113 return Message(seq, cargo, client_ts, server_ts, return_ts) 114 115 116def update_message(message, **kwargs): 117 return Message(*message)._replace(**kwargs) 118 119 120class MessageStatsCollector(object): 121 def __init__(self, label): 122 self.label = label 123 self.buffer = [] # buffer to store messages during report interval 124 self.series = [] # stats for every report interval 125 126 now = time.time() 127 diff = int(now) - now + 1 # align start to whole seconds 128 threading.Timer(diff, self.monitor).start() # schedule in a second 129 130 def monitor(self): 131 global IS_RUNNING 132 if IS_RUNNING: 133 # NOTE(kbespalov): this way not properly works 134 # because the monitor starting with range 1sec +-150 ms 135 # due to high threading contention between rpc clients 136 threading.Timer(1.0, self.monitor).start() 137 now = time.time() 138 139 count = len(self.buffer) 140 141 size = 0 142 min_latency = sys.maxsize 143 max_latency = 0 144 sum_latencies = 0 145 146 for i in range(count): 147 p = self.buffer[i] 148 size += len(p.cargo) 149 150 latency = None 151 if p.return_ts: 152 latency = p.return_ts - p.client_ts # round-trip 153 elif p.server_ts: 154 latency = p.server_ts - p.client_ts # client -> server 155 156 if latency: 157 sum_latencies += latency 158 min_latency = min(min_latency, latency) 159 max_latency = max(max_latency, latency) 160 161 del self.buffer[:count] # trim processed items 162 163 seq = len(self.series) 164 stats = dict(seq=seq, timestamp=now, count=count, size=size) 165 msg = ('%-14s: seq: %-4d count: %-6d bytes: %-10d' % 166 (self.label, seq, count, size)) 167 168 if sum_latencies: 169 latency = sum_latencies / count 170 stats.update(dict(latency=latency, 171 min_latency=min_latency, 172 max_latency=max_latency)) 173 msg += (' latency: %-9.3f min: %-9.3f max: %-9.3f' % 174 (latency, min_latency, max_latency)) 175 176 self.series.append(stats) 177 LOG.info(msg) 178 179 def push(self, parsed_message): 180 self.buffer.append(parsed_message) 181 182 def get_series(self): 183 return self.series 184 185 @staticmethod 186 def calc_stats(label, *collectors): 187 count = 0 188 size = 0 189 min_latency = sys.maxsize 190 max_latency = 0 191 sum_latencies = 0 192 start = sys.maxsize 193 end = 0 194 195 for point in itertools.chain(*(c.get_series() for c in collectors)): 196 count += point['count'] 197 size += point['size'] 198 if point['count']: 199 # NOTE(kbespalov): 200 # we except the start and end time as time of 201 # first and last processed message, no reason 202 # to set boundaries if server was idle before 203 # running of clients and after. 204 start = min(start, point['timestamp']) 205 end = max(end, point['timestamp']) 206 207 if 'latency' in point: 208 sum_latencies += point['latency'] * point['count'] 209 min_latency = min(min_latency, point['min_latency']) 210 max_latency = max(max_latency, point['max_latency']) 211 212 # start is the timestamp of the earliest block, which inclides samples 213 # for the prior second 214 start -= 1 215 duration = end - start if count else 0 216 stats = dict(count=count, size=size, duration=duration, count_p_s=0, 217 size_p_s=0) 218 if duration: 219 stats.update(dict(start=start, end=end, 220 count_p_s=count / duration, 221 size_p_s=size / duration)) 222 223 msg = ('%s: duration: %.2f count: %d (%.1f msg/sec) ' 224 'bytes: %d (%.0f bps)' % 225 (label, duration, count, stats['count_p_s'], 226 size, stats['size_p_s'])) 227 228 if sum_latencies: 229 latency = sum_latencies / count 230 stats.update(dict(latency=latency, 231 min_latency=min_latency, 232 max_latency=max_latency)) 233 msg += (' latency: %.3f min: %.3f max: %.3f' % 234 (latency, min_latency, max_latency)) 235 236 LOG.info(msg) 237 return stats 238 239 240class NotifyEndpoint(object): 241 def __init__(self, wait_before_answer, requeue): 242 self.wait_before_answer = wait_before_answer 243 self.requeue = requeue 244 self.received_messages = MessageStatsCollector('server') 245 self.cache = set() 246 247 def info(self, ctxt, publisher_id, event_type, payload, metadata): 248 LOG.debug("%s %s %s %s", ctxt, publisher_id, event_type, payload) 249 250 server_ts = time.time() 251 252 message = update_message(payload, server_ts=server_ts) 253 self.received_messages.push(message) 254 255 if self.requeue and message.seq not in self.cache: 256 self.cache.add(message.seq) 257 258 if self.wait_before_answer > 0: 259 time.sleep(self.wait_before_answer) 260 261 return messaging.NotificationResult.REQUEUE 262 263 return messaging.NotificationResult.HANDLED 264 265 266def notify_server(transport, topic, wait_before_answer, duration, requeue): 267 endpoints = [NotifyEndpoint(wait_before_answer, requeue)] 268 target = messaging.Target(topic=topic) 269 server = notify.get_notification_listener(transport, [target], 270 endpoints, executor='eventlet') 271 run_server(server, duration=duration) 272 273 return endpoints[0] 274 275 276class BatchNotifyEndpoint(object): 277 def __init__(self, wait_before_answer, requeue): 278 self.wait_before_answer = wait_before_answer 279 self.requeue = requeue 280 self.received_messages = MessageStatsCollector('server') 281 self.cache = set() 282 283 def info(self, batch): 284 LOG.debug('msg rcv') 285 LOG.debug("%s", batch) 286 287 server_ts = time.time() 288 289 for item in batch: 290 message = update_message(item['payload'], server_ts=server_ts) 291 self.received_messages.push(message) 292 293 return messaging.NotificationResult.HANDLED 294 295 296def batch_notify_server(transport, topic, wait_before_answer, duration, 297 requeue): 298 endpoints = [BatchNotifyEndpoint(wait_before_answer, requeue)] 299 target = messaging.Target(topic=topic) 300 server = notify.get_batch_notification_listener( 301 transport, [target], 302 endpoints, executor='eventlet', 303 batch_size=1000, batch_timeout=5) 304 run_server(server, duration=duration) 305 306 return endpoints[0] 307 308 309class RpcEndpoint(object): 310 def __init__(self, wait_before_answer): 311 self.wait_before_answer = wait_before_answer 312 self.received_messages = MessageStatsCollector('server') 313 314 def info(self, ctxt, message): 315 server_ts = time.time() 316 317 LOG.debug("######## RCV: %s", message) 318 319 reply = update_message(message, server_ts=server_ts) 320 self.received_messages.push(reply) 321 322 if self.wait_before_answer > 0: 323 time.sleep(self.wait_before_answer) 324 325 return reply 326 327 328class ServerControlEndpoint(object): 329 def __init__(self, controlled_server): 330 self.connected_clients = set() 331 self.controlled_server = controlled_server 332 333 def sync_start(self, ctx, message): 334 """Handle start reports from clients""" 335 336 client_id = message['id'] 337 LOG.info('The client %s started to send messages' % client_id) 338 self.connected_clients.add(client_id) 339 340 def sync_done(self, ctx, message): 341 """Handle done reports from clients""" 342 343 client_id = message['id'] 344 LOG.info('The client %s finished msg sending.' % client_id) 345 346 if client_id in self.connected_clients: 347 self.connected_clients.remove(client_id) 348 349 if not self.connected_clients: 350 LOG.info( 351 'The clients sent all messages. Shutting down the server..') 352 threading.Timer(1, self._stop_server_with_delay).start() 353 354 def _stop_server_with_delay(self): 355 self.controlled_server.stop() 356 self.controlled_server.wait() 357 358 359class Client(object): 360 def __init__(self, client_id, client, method, has_result, 361 wait_after_msg): 362 self.client_id = client_id 363 self.client = client 364 self.method = method 365 self.wait_after_msg = wait_after_msg 366 367 self.seq = 0 368 self.messages_count = len(MESSAGES) 369 # Start sending the messages from a random position to avoid 370 # memory re-usage and generate more realistic load on the library 371 # and a message transport 372 self.position = random.randint(0, self.messages_count - 1) 373 self.sent_messages = MessageStatsCollector('client-%s' % client_id) 374 self.errors = MessageStatsCollector('error-%s' % client_id) 375 376 if has_result: 377 self.round_trip_messages = MessageStatsCollector( 378 'round-trip-%s' % client_id) 379 380 def host_based_id(self): 381 _id = "%(client_id)s %(salt)s@%(hostname)s" 382 return _id % {'hostname': CURRENT_HOST, 383 'salt': hex(id(self))[2:], 384 'client_id': self.client_id} 385 386 def send_msg(self): 387 msg = make_message(self.seq, MESSAGES[self.position], time.time()) 388 self.sent_messages.push(msg) 389 390 res = None 391 try: 392 res = self.method(self.client, msg) 393 except Exception: 394 self.errors.push(msg) 395 else: 396 LOG.debug("SENT: %s", msg) 397 398 if res: 399 return_ts = time.time() 400 res = update_message(res, return_ts=return_ts) 401 self.round_trip_messages.push(res) 402 403 self.seq += 1 404 self.position = (self.position + 1) % self.messages_count 405 if self.wait_after_msg > 0: 406 time.sleep(self.wait_after_msg) 407 408 409class RPCClient(Client): 410 def __init__(self, client_id, transport, target, timeout, is_cast, 411 wait_after_msg, sync_mode=False): 412 413 client = rpc.RPCClient(transport, target) 414 method = _rpc_cast if is_cast else _rpc_call 415 416 super(RPCClient, self).__init__(client_id, 417 client.prepare(timeout=timeout), 418 method, 419 not is_cast, wait_after_msg) 420 self.sync_mode = sync_mode 421 self.is_sync = False 422 423 # prepare the sync client 424 if sync_mode: 425 if sync_mode == 'call': 426 self.sync_client = self.client 427 else: 428 self.sync_client = client.prepare(fanout=True, timeout=timeout) 429 430 def send_msg(self): 431 if self.sync_mode and not self.is_sync: 432 self.is_sync = self.sync_start() 433 super(RPCClient, self).send_msg() 434 435 def sync_start(self): 436 try: 437 msg = {'id': self.host_based_id()} 438 method = _rpc_call if self.sync_mode == 'call' else _rpc_cast 439 method(self.sync_client, msg, 'sync_start') 440 except Exception: 441 LOG.error('The client: %s failed to sync with %s.' % 442 (self.client_id, self.client.target)) 443 return False 444 LOG.info('The client: %s successfully sync with %s' % ( 445 self.client_id, self.client.target)) 446 return True 447 448 def sync_done(self): 449 try: 450 msg = {'id': self.host_based_id()} 451 method = _rpc_call if self.sync_mode == 'call' else _rpc_cast 452 method(self.sync_client, msg, 'sync_done') 453 except Exception: 454 LOG.error('The client: %s failed finish the sync with %s.' 455 % (self.client_id, self.client.target)) 456 return False 457 LOG.info('The client: %s successfully finished sync with %s' 458 % (self.client_id, self.client.target)) 459 return True 460 461 462class NotifyClient(Client): 463 def __init__(self, client_id, transport, topic, wait_after_msg): 464 client = notify.Notifier(transport, driver='messaging', topics=topic) 465 client = client.prepare(publisher_id='publisher-%d' % client_id) 466 method = _notify 467 super(NotifyClient, self).__init__(client_id, client, method, 468 False, wait_after_msg) 469 470 471def generate_messages(messages_count): 472 # Limit the messages amount. Clients will reiterate the array again 473 # if an amount of messages to be sent is bigger than MESSAGES_LIMIT 474 if messages_count > MESSAGES_LIMIT: 475 messages_count = MESSAGES_LIMIT 476 LOG.info("Generating %d random messages", messages_count) 477 generator = init_random_generator() 478 for i in range(messages_count): 479 length = generator() 480 msg = ''.join(random.choice( 481 string.ascii_lowercase) for x in range(length)) 482 MESSAGES.append(msg) 483 484 LOG.info("Messages has been prepared") 485 486 487def wrap_sigexit(f): 488 def inner(*args, **kwargs): 489 try: 490 return f(*args, **kwargs) 491 except SignalExit as e: 492 LOG.info('Signal %s is caught. Interrupting the execution', 493 e.signo) 494 for server in SERVERS: 495 server.stop() 496 server.wait() 497 finally: 498 if TRANSPORT: 499 TRANSPORT.cleanup() 500 return inner 501 502 503@wrap_sigexit 504def run_server(server, duration=None): 505 global IS_RUNNING 506 SERVERS.append(server) 507 server.start() 508 if duration: 509 with timeutils.StopWatch(duration) as stop_watch: 510 while not stop_watch.expired() and IS_RUNNING: 511 time.sleep(1) 512 server.stop() 513 IS_RUNNING = False 514 server.wait() 515 LOG.info('The server is terminating') 516 time.sleep(1) # wait for stats collector to process the last second 517 518 519def rpc_server(transport, target, wait_before_answer, executor, duration): 520 521 endpoints = [RpcEndpoint(wait_before_answer)] 522 server = rpc.get_rpc_server(transport, target, endpoints, executor) 523 524 # make the rpc server controllable by rpc clients 525 endpoints.append(ServerControlEndpoint(server)) 526 527 LOG.debug("starting RPC server for target %s", target) 528 529 run_server(server, duration=duration) 530 531 return server.dispatcher.endpoints[0] 532 533 534@wrap_sigexit 535def spawn_rpc_clients(threads, transport, targets, wait_after_msg, timeout, 536 is_cast, messages_count, duration, sync_mode): 537 p = eventlet.GreenPool(size=threads) 538 targets = itertools.cycle(targets) 539 540 for i in range(threads): 541 target = next(targets) 542 LOG.debug("starting RPC client for target %s", target) 543 client_builder = functools.partial(RPCClient, i, transport, target, 544 timeout, is_cast, wait_after_msg, 545 sync_mode) 546 p.spawn_n(send_messages, i, client_builder, 547 messages_count, duration) 548 p.waitall() 549 550 551@wrap_sigexit 552def spawn_notify_clients(threads, topic, transport, message_count, 553 wait_after_msg, timeout, duration): 554 p = eventlet.GreenPool(size=threads) 555 for i in range(threads): 556 client_builder = functools.partial(NotifyClient, i, transport, [topic], 557 wait_after_msg) 558 p.spawn_n(send_messages, i, client_builder, message_count, duration) 559 p.waitall() 560 561 562def send_messages(client_id, client_builder, messages_count, duration): 563 global IS_RUNNING 564 client = client_builder() 565 CLIENTS.append(client) 566 567 # align message sending closer to whole seconds 568 now = time.time() 569 diff = int(now) - now + 1 570 time.sleep(diff) 571 572 if duration: 573 with timeutils.StopWatch(duration) as stop_watch: 574 while not stop_watch.expired() and IS_RUNNING: 575 client.send_msg() 576 eventlet.sleep() 577 IS_RUNNING = False 578 else: 579 LOG.debug("Sending %d messages using client %d", 580 messages_count, client_id) 581 for _ in range(messages_count): 582 client.send_msg() 583 eventlet.sleep() 584 if not IS_RUNNING: 585 break 586 LOG.debug("Client %d has sent %d messages", client_id, messages_count) 587 588 # wait for replies to be collected 589 time.sleep(1) 590 591 # send stop request to the rpc server 592 if isinstance(client, RPCClient) and client.is_sync: 593 client.sync_done() 594 595 596def _rpc_call(client, msg, remote_method='info'): 597 try: 598 res = client.call({}, remote_method, message=msg) 599 except Exception as e: 600 LOG.exception('Error %s on CALL for message %s', str(e), msg) 601 raise 602 else: 603 LOG.debug("SENT: %s, RCV: %s", msg, res) 604 return res 605 606 607def _rpc_cast(client, msg, remote_method='info'): 608 try: 609 client.cast({}, remote_method, message=msg) 610 except Exception as e: 611 LOG.exception('Error %s on CAST for message %s', str(e), msg) 612 raise 613 else: 614 LOG.debug("SENT: %s", msg) 615 616 617def _notify(notification_client, msg): 618 notification_client.info({}, 'compute.start', msg) 619 620 621def show_server_stats(endpoint, json_filename): 622 LOG.info('=' * 35 + ' summary ' + '=' * 35) 623 output = dict(series={}, summary={}) 624 output['series']['server'] = endpoint.received_messages.get_series() 625 stats = MessageStatsCollector.calc_stats( 626 'server', endpoint.received_messages) 627 output['summary'] = stats 628 629 if json_filename: 630 write_json_file(json_filename, output) 631 632 633def show_client_stats(clients, json_filename, has_reply=False): 634 LOG.info('=' * 35 + ' summary ' + '=' * 35) 635 output = dict(series={}, summary={}) 636 637 for cl in clients: 638 cl_id = cl.client_id 639 output['series']['client_%s' % cl_id] = cl.sent_messages.get_series() 640 output['series']['error_%s' % cl_id] = cl.errors.get_series() 641 642 if has_reply: 643 output['series']['round_trip_%s' % cl_id] = ( 644 cl.round_trip_messages.get_series()) 645 646 sent_stats = MessageStatsCollector.calc_stats( 647 'client', *(cl.sent_messages for cl in clients)) 648 output['summary']['client'] = sent_stats 649 650 error_stats = MessageStatsCollector.calc_stats( 651 'error', *(cl.errors for cl in clients)) 652 output['summary']['error'] = error_stats 653 654 if has_reply: 655 round_trip_stats = MessageStatsCollector.calc_stats( 656 'round-trip', *(cl.round_trip_messages for cl in clients)) 657 output['summary']['round_trip'] = round_trip_stats 658 659 if json_filename: 660 write_json_file(json_filename, output) 661 662 663def write_json_file(filename, output): 664 with open(filename, 'w') as f: 665 f.write(json.dumps(output)) 666 LOG.info('Stats are written into %s', filename) 667 668 669class SignalExit(SystemExit): 670 def __init__(self, signo, exccode=1): 671 super(SignalExit, self).__init__(exccode) 672 self.signo = signo 673 674 675def signal_handler(signum, frame): 676 global IS_RUNNING 677 IS_RUNNING = False 678 679 raise SignalExit(signum) 680 681 682def _setup_logging(is_debug): 683 log_level = logging.DEBUG if is_debug else logging.INFO 684 logging.basicConfig( 685 stream=sys.stdout, level=log_level, 686 format="%(asctime)-15s %(levelname)s %(name)s %(message)s") 687 logging.getLogger().handlers[0].addFilter(LoggingNoParsingFilter()) 688 for i in ['kombu', 'amqp', 'stevedore', 'qpid.messaging' 689 'oslo.messaging._drivers.amqp', ]: 690 logging.getLogger(i).setLevel(logging.WARN) 691 692 693def main(): 694 parser = argparse.ArgumentParser( 695 description='Tools to play with oslo.messaging\'s RPC', 696 usage=USAGE, 697 ) 698 parser.add_argument('--url', dest='url', 699 help="oslo.messaging transport url") 700 parser.add_argument('-d', '--debug', dest='debug', action='store_true', 701 help="Turn on DEBUG logging level instead of WARN") 702 parser.add_argument('-tp', '--topic', dest='topic', 703 default="profiler_topic", 704 help="Topics to publish/receive messages to/from.") 705 parser.add_argument('-s', '--server', dest='server', 706 default="profiler_server", 707 help="Servers to publish/receive messages to/from.") 708 parser.add_argument('-tg', '--targets', dest='targets', nargs="+", 709 default=["profiler_topic.profiler_server"], 710 help="Targets to publish/receive messages to/from.") 711 parser.add_argument('-l', dest='duration', type=int, 712 help='send messages for certain time') 713 parser.add_argument('-j', '--json', dest='json_filename', 714 help='File name to store results in JSON format') 715 parser.add_argument('--config-file', dest='config_file', type=str, 716 help="Oslo messaging config file") 717 718 subparsers = parser.add_subparsers(dest='mode', 719 help='notify/rpc server/client mode') 720 721 server = subparsers.add_parser('notify-server') 722 server.add_argument('-w', dest='wait_before_answer', type=int, default=-1) 723 server.add_argument('--requeue', dest='requeue', action='store_true') 724 725 server = subparsers.add_parser('batch-notify-server') 726 server.add_argument('-w', dest='wait_before_answer', type=int, default=-1) 727 server.add_argument('--requeue', dest='requeue', action='store_true') 728 729 client = subparsers.add_parser('notify-client') 730 client.add_argument('-p', dest='threads', type=int, default=1, 731 help='number of client threads') 732 client.add_argument('-m', dest='messages', type=int, default=1, 733 help='number of call per threads') 734 client.add_argument('-w', dest='wait_after_msg', type=float, default=-1, 735 help='sleep time between two messages') 736 client.add_argument('--timeout', dest='timeout', type=int, default=3, 737 help='client timeout') 738 739 server = subparsers.add_parser('rpc-server') 740 server.add_argument('-w', dest='wait_before_answer', type=int, default=-1) 741 server.add_argument('-e', '--executor', dest='executor', 742 type=str, default='eventlet', 743 help='name of a message executor') 744 745 client = subparsers.add_parser('rpc-client') 746 client.add_argument('-p', dest='threads', type=int, default=1, 747 help='number of client threads') 748 client.add_argument('-m', dest='messages', type=int, default=1, 749 help='number of call per threads') 750 client.add_argument('-w', dest='wait_after_msg', type=float, default=-1, 751 help='sleep time between two messages') 752 client.add_argument('--timeout', dest='timeout', type=int, default=3, 753 help='client timeout') 754 client.add_argument('--exit-wait', dest='exit_wait', type=int, default=0, 755 help='Keep connections open N seconds after calls ' 756 'have been done') 757 client.add_argument('--is-cast', dest='is_cast', action='store_true', 758 help='Use `call` or `cast` RPC methods') 759 client.add_argument('--is-fanout', dest='is_fanout', action='store_true', 760 help='fanout=True for CAST messages') 761 762 client.add_argument('--sync', dest='sync', choices=('call', 'fanout'), 763 help="stop server when all msg was sent by clients") 764 765 args = parser.parse_args() 766 767 _setup_logging(is_debug=args.debug) 768 769 if args.config_file: 770 cfg.CONF(["--config-file", args.config_file]) 771 772 global TRANSPORT 773 if args.mode in ['rpc-server', 'rpc-client']: 774 TRANSPORT = messaging.get_transport(cfg.CONF, url=args.url) 775 else: 776 TRANSPORT = messaging.get_notification_transport(cfg.CONF, 777 url=args.url) 778 779 if args.mode in ['rpc-client', 'notify-client']: 780 # always generate maximum number of messages for duration-limited tests 781 generate_messages(MESSAGES_LIMIT if args.duration else args.messages) 782 783 # oslo.config defaults 784 cfg.CONF.heartbeat_interval = 5 785 cfg.CONF.prog = os.path.basename(__file__) 786 cfg.CONF.project = 'oslo.messaging' 787 788 signal.signal(signal.SIGTERM, signal_handler) 789 signal.signal(signal.SIGINT, signal_handler) 790 791 if args.mode == 'rpc-server': 792 target = messaging.Target(topic=args.topic, server=args.server) 793 endpoint = rpc_server(TRANSPORT, target, args.wait_before_answer, 794 args.executor, args.duration) 795 show_server_stats(endpoint, args.json_filename) 796 797 elif args.mode == 'notify-server': 798 endpoint = notify_server(TRANSPORT, args.topic, 799 args.wait_before_answer, args.duration, 800 args.requeue) 801 show_server_stats(endpoint, args.json_filename) 802 803 elif args.mode == 'batch-notify-server': 804 endpoint = batch_notify_server(TRANSPORT, args.topic, 805 args.wait_before_answer, 806 args.duration, args.requeue) 807 show_server_stats(endpoint, args.json_filename) 808 809 elif args.mode == 'notify-client': 810 spawn_notify_clients(args.threads, args.topic, TRANSPORT, 811 args.messages, args.wait_after_msg, 812 args.timeout, args.duration) 813 show_client_stats(CLIENTS, args.json_filename) 814 815 elif args.mode == 'rpc-client': 816 817 targets = [] 818 for target in args.targets: 819 tp, srv = target.partition('.')[::2] 820 t = messaging.Target(topic=tp, server=srv, fanout=args.is_fanout) 821 targets.append(t) 822 823 spawn_rpc_clients(args.threads, TRANSPORT, targets, 824 args.wait_after_msg, args.timeout, args.is_cast, 825 args.messages, args.duration, args.sync) 826 show_client_stats(CLIENTS, args.json_filename, not args.is_cast) 827 828 if args.exit_wait: 829 LOG.info("Finished. waiting for %d seconds", args.exit_wait) 830 time.sleep(args.exit_wait) 831 832 833if __name__ == '__main__': 834 CURRENT_PID = os.getpid() 835 CURRENT_HOST = socket.gethostname() 836 main() 837