1#!/usr/local/bin/python3.8
2#
3# Authors: Sergey Satskiy
4#
5# $Id: ns_traffic_generator.py 309785 2011-06-28 13:46:22Z satskyse $
6#
7
8"""
9NetScheduler traffic generator
10"""
11
12
13import sys, threading, time, datetime
14from optparse import OptionParser
15from libgridclient import GridClient
16from ns_traffic_settings import IndividualLoaders, SubmitDropSettings
17from submitdroploader import SubmitDropLoader
18from batchsubmitdroploader import BatchSubmitDropLoader
19from singlefullok import SingleFullOKLoopLoader
20
21
22queueName = 'TEST'
23
24
25def main():
26    " main function for the netschedule traffic loader "
27
28    global queueName
29
30    parser = OptionParser(
31    """
32    %prog  <host>  <port>
33    NetScheduler traffic generator, see ns_traffic_settings.py to configure
34    """ )
35    parser.add_option( "-v", "--verbose",
36                       action="store_true", dest="verbose", default=False,
37                       help="be verbose (default: False)" )
38    parser.add_option( "-q", "--queue",
39                       dest="qname", default=queueName,
40                       help="NS queue name" )
41
42
43    # parse the command line options
44    options, args = parser.parse_args()
45    verbose = options.verbose
46    queueName = options.qname
47
48    # Check the number of arguments
49    if len( args ) != 2:
50        return parserError( parser, "Incorrect number of arguments" )
51    host = args[ 0 ]
52    port = int( args[ 1 ] )
53    if port <= 0 or port > 65535:
54        raise Exception( "Incorrect port number" )
55
56    if verbose:
57        print "Using NetSchedule port: " + str( port )
58        print "Using NetSchedule host: " + host
59
60    # This instance will be shared between threads
61    gridClient = GridClient( host, port, verbose )
62
63    # Create threads and run them
64    threads = []
65
66    if IndividualLoaders.SubmitDropLoader:
67        threads.append( SubmitDropLoader( gridClient, queueName ) )
68    if IndividualLoaders.BatchSubmitDropLoader:
69        threads.append( BatchSubmitDropLoader( gridClient, queueName ) )
70    if IndividualLoaders.SingleFullOKLoopLoader:
71        threads.append( SingleFullOKLoopLoader( gridClient, queueName ) )
72
73
74    if len( threads ) == 0:
75        print "No loaders enabled. Exiting."
76        return 0
77
78    # Run all the enabled loaders and their watcher
79    monitor = ExecutionMonitor( threads )
80    monitor.start()
81    for thread in threads:
82        thread.start()
83
84    # Wait till all the loaders finished
85    for thread in threads:
86        thread.join()
87
88    monitor.stopRequest()
89    monitor.join()
90    return 0
91
92
93class ExecutionMonitor( threading.Thread ):
94    " prints statistics every 5 seconds "
95    def __init__( self, threads ):
96        threading.Thread.__init__( self )
97        self.__threads = threads
98        self.__stopRequest = False
99        return
100
101    def stopRequest( self ):
102        " request the thread to stop "
103        self.__stopRequest = True
104        return
105
106    def run( self ):
107        " threaded function "
108        while self.__stopRequest == False:
109            self.printOnce()
110            time.sleep( 10 )
111        self.printOnce()
112        return
113
114    def printOnce( self ):
115        " prints statistics once "
116        now = datetime.datetime.now()
117        output = "Timestamp: " + now.isoformat() + "\n"
118        for thread in self.__threads:
119            output += thread.getName() + " loader: " + \
120                      str( thread.getCount() ) + "\n"
121        print output
122        return
123
124
125
126def parserError( parser, message ):
127    " Prints the message and help on stderr "
128
129    sys.stdout = sys.stderr
130    print message
131    parser.print_help()
132    return 1
133
134
135# The script execution entry point
136if __name__ == "__main__":
137    try:
138        returnValue = main()
139    except KeyboardInterrupt:
140        # Ctrl+C
141        print >> sys.stderr, "Ctrl + C received"
142        returnValue = 2
143
144    except Exception, excpt:
145        print >> sys.stderr, str( excpt )
146        returnValue = 1
147
148    sys.exit( returnValue )
149
150