1#!/usr/local/bin/python3.8 2# 3# Authors: Sergey Satskiy 4# 5# $Id: ns_traffic_generator.py 309785 2011-06-28 13:46:22Z satskyse $ 6# 7 8""" 9NetScheduler traffic generator 10""" 11 12 13import sys, threading, time, datetime 14from optparse import OptionParser 15from libgridclient import GridClient 16from ns_traffic_settings import IndividualLoaders, SubmitDropSettings 17from submitdroploader import SubmitDropLoader 18from batchsubmitdroploader import BatchSubmitDropLoader 19from singlefullok import SingleFullOKLoopLoader 20 21 22queueName = 'TEST' 23 24 25def main(): 26 " main function for the netschedule traffic loader " 27 28 global queueName 29 30 parser = OptionParser( 31 """ 32 %prog <host> <port> 33 NetScheduler traffic generator, see ns_traffic_settings.py to configure 34 """ ) 35 parser.add_option( "-v", "--verbose", 36 action="store_true", dest="verbose", default=False, 37 help="be verbose (default: False)" ) 38 parser.add_option( "-q", "--queue", 39 dest="qname", default=queueName, 40 help="NS queue name" ) 41 42 43 # parse the command line options 44 options, args = parser.parse_args() 45 verbose = options.verbose 46 queueName = options.qname 47 48 # Check the number of arguments 49 if len( args ) != 2: 50 return parserError( parser, "Incorrect number of arguments" ) 51 host = args[ 0 ] 52 port = int( args[ 1 ] ) 53 if port <= 0 or port > 65535: 54 raise Exception( "Incorrect port number" ) 55 56 if verbose: 57 print "Using NetSchedule port: " + str( port ) 58 print "Using NetSchedule host: " + host 59 60 # This instance will be shared between threads 61 gridClient = GridClient( host, port, verbose ) 62 63 # Create threads and run them 64 threads = [] 65 66 if IndividualLoaders.SubmitDropLoader: 67 threads.append( SubmitDropLoader( gridClient, queueName ) ) 68 if IndividualLoaders.BatchSubmitDropLoader: 69 threads.append( BatchSubmitDropLoader( gridClient, queueName ) ) 70 if IndividualLoaders.SingleFullOKLoopLoader: 71 threads.append( SingleFullOKLoopLoader( gridClient, queueName ) ) 72 73 74 if len( threads ) == 0: 75 print "No loaders enabled. Exiting." 76 return 0 77 78 # Run all the enabled loaders and their watcher 79 monitor = ExecutionMonitor( threads ) 80 monitor.start() 81 for thread in threads: 82 thread.start() 83 84 # Wait till all the loaders finished 85 for thread in threads: 86 thread.join() 87 88 monitor.stopRequest() 89 monitor.join() 90 return 0 91 92 93class ExecutionMonitor( threading.Thread ): 94 " prints statistics every 5 seconds " 95 def __init__( self, threads ): 96 threading.Thread.__init__( self ) 97 self.__threads = threads 98 self.__stopRequest = False 99 return 100 101 def stopRequest( self ): 102 " request the thread to stop " 103 self.__stopRequest = True 104 return 105 106 def run( self ): 107 " threaded function " 108 while self.__stopRequest == False: 109 self.printOnce() 110 time.sleep( 10 ) 111 self.printOnce() 112 return 113 114 def printOnce( self ): 115 " prints statistics once " 116 now = datetime.datetime.now() 117 output = "Timestamp: " + now.isoformat() + "\n" 118 for thread in self.__threads: 119 output += thread.getName() + " loader: " + \ 120 str( thread.getCount() ) + "\n" 121 print output 122 return 123 124 125 126def parserError( parser, message ): 127 " Prints the message and help on stderr " 128 129 sys.stdout = sys.stderr 130 print message 131 parser.print_help() 132 return 1 133 134 135# The script execution entry point 136if __name__ == "__main__": 137 try: 138 returnValue = main() 139 except KeyboardInterrupt: 140 # Ctrl+C 141 print >> sys.stderr, "Ctrl + C received" 142 returnValue = 2 143 144 except Exception, excpt: 145 print >> sys.stderr, str( excpt ) 146 returnValue = 1 147 148 sys.exit( returnValue ) 149 150