1# 2# Authors: Sergey Satskiy 3# 4# $Id: singlefullok.py 309973 2011-06-29 13:29:50Z satskyse $ 5# 6 7""" 8NetScheduler Submit/Get/Commit/Read/Confirm loader 9""" 10 11import sys, time, threading 12from ns_traffic_settings import SingleFullOKLoopSettings 13 14 15 16class SingleFullOKLoopLoader( threading.Thread ): 17 " Submit/Get/Commit/Read/Confirm loader " 18 19 def __init__( self, gridClient, qname ): 20 threading.Thread.__init__( self ) 21 self.__gridClient = gridClient 22 self.__qname = qname 23 self.__count = 0 24 return 25 26 def getName( self ): 27 " Loader identification " 28 return "Submit/Get/Commit/Read/Confirm" 29 30 def getCount( self ): 31 " Provides haw many loops completed " 32 return self.__count 33 34 def run( self ): 35 " threaded function " 36 pSize = SingleFullOKLoopSettings.packageSize 37 if pSize <= 0: 38 print >> sys.stderr, \ 39 "Invalid SingleFullOKLoopSettings.packageSize (" + \ 40 str( pSize ) + "). Must be > 0" 41 return 42 pause = SingleFullOKLoopSettings.pause 43 if pause < 0: 44 print >> sys.stderr, \ 45 "Invalid SingleFullOKLoopSettings.pause (" + \ 46 str( pause ) + "). Must be >= 0" 47 return 48 49 pCount = SingleFullOKLoopSettings.packagesCount 50 if not ( pCount == -1 or pCount > 0 ): 51 print >> sys.stderr, \ 52 "Invalid SingleFullOKLoopSettings.packagesCount (" + \ 53 str( pCount ) + "). Must be > 0 or -1" 54 return 55 56 57 # Settings are OK 58 while True: 59 pSize = SingleFullOKLoopSettings.packageSize 60 while pSize > 0: 61 jobKey = self.__safeSubmit() 62 if jobKey != "": 63 jobKey = self.__safeGet() 64 if jobKey != "": 65 if self.__safeCommit( jobKey ): 66 groupID, jobs = self.__safeRead() 67 if groupID != -1: 68 self.__safeConfirm( groupID, jobs ) 69 70 self.__count += 1 71 pSize -= 1 72 73 if pause > 0: 74 time.sleep( pause ) 75 76 if pCount == -1: # Infinite loop 77 continue 78 pCount -= 1 79 if pCount <= 0: 80 break 81 return 82 83 def __safeSubmit( self ): 84 " Exception safe submitting a job " 85 jobKey = "" 86 try: 87 jobKey = self.__gridClient.submitJob( self.__qname, 88 "bla", "ok_loop_aff" ) 89 except Exception, excp: 90 print >> sys.stderr, \ 91 "Submit/Get/Commit/Read/Confirm: Cannot submit a job" 92 print >> sys.stderr, str( excp ) 93 return jobKey 94 95 def __safeGet( self ): 96 " Exception safe getting a job " 97 jobKey = "" 98 try: 99 jobKey = self.__gridClient.getJob( self.__qname, "ok_loop_aff" ) 100 except Exception, excp: 101 print >> sys.stderr, \ 102 "Submit/Get/Commit/Read/Confirm: Cannot get a job" 103 print >> sys.stderr, str( excp ) 104 return jobKey 105 106 def __safeCommit( self, jobKey ): 107 " Exception safe commiting a job " 108 try: 109 self.__gridClient.commitJob( self.__qname, jobKey, 0 ) 110 except Exception, excp: 111 print >> sys.stderr, \ 112 "Submit/Get/Commit/Read/Confirm: Cannot commit a job (" + \ 113 jobKey + ")" 114 print >> sys.stderr, str( excp ) 115 return False 116 return True 117 118 def __safeRead( self ): 119 " Exception safe reading a job " 120 groupID = -1 121 jobs = [] 122 try: 123 groupID, jobs = self.__gridClient.readJobs( self.__qname, 1 ) 124 except Exception, excp: 125 print >> sys.stderr, \ 126 "Submit/Get/Commit/Read/Confirm: Cannot read a job" 127 print >> sys.stderr, str( excp ) 128 return groupID, jobs 129 130 def __safeConfirm( self, groupID, jobs ): 131 " Exception safe confirming reading a job " 132 try: 133 self.__gridClient.confirmJobRead( self.__qname, groupID, jobs ) 134 except Exception, excp: 135 print >> sys.stderr, \ 136 "Submit/Get/Commit/Read/Confirm: " \ 137 "Cannot confirm reading a job (" + \ 138 ", ".join( jobs ) + ")" 139 print >> sys.stderr, str( excp ) 140 return 141 142