1#!/usr/local/bin/python3.8 2# 3# Authors: Sergey Satskiy 4# 5# $Id: libgridclient.py 309785 2011-06-28 13:46:22Z satskyse $ 6# 7 8 9""" 10grid_cli utility python wrapper 11""" 12 13import os, tempfile 14from subprocess import Popen, PIPE 15 16 17 18class GridClient: 19 " Wrapper for grid_cli " 20 21 def __init__( self, host, port, verbose = False ): 22 self.__host = host 23 self.__port = str( port ) 24 self.__verbose = verbose 25 26 # Check for grid_cli 27 if os.system( "grid_cli help > /dev/null 2>&1" ) != 0: 28 raise Exception( "Cannot find grid_cli available via PATH " ) 29 return 30 31 def __printCmd( self, cmd ): 32 " Handy function to print the command if needed " 33 if self.__verbose: 34 print "Executing command: " + " ".join( cmd ) 35 36 def submitJob( self, qname, jobinput, affinity = "", 37 tags = "", progress_msg = "", exclusive = False ): 38 " Submit a job and provide the submitted job key " 39 40 cmdLine = [ "grid_cli", "submitjob", 41 "--queue=" + qname, 42 "--input=" + jobinput, 43 "--ns=" + self.__host + ":" + self.__port ] 44 if affinity != "": 45 cmdLine += [ "--affinity=" + affinity ] 46 if tags != "": 47 cmdLine += [ "--job-tag=" + tags ] 48 if progress_msg != "": 49 cmdLine += [ "--progress-message=" + progress_msg ] 50 if exclusive: 51 cmdLine += [ "--exclusive-job" ] 52 53 self.__printCmd( cmdLine ) 54 return safeRun( cmdLine ).strip() 55 56 def killJob( self, qname, jobKey, auth = 'netschedule_admin' ): 57 " removes a job from the queue " 58 59 cmdLine = [ "grid_cli", "kill", jobKey, 60 "--queue=" + qname, 61 "--ns=" + self.__host + ":" + self.__port ] 62 if auth != "": 63 cmdLine += [ "--auth=" + auth ] 64 65 self.__printCmd( cmdLine ) 66 safeRun( cmdLine ) 67 return 68 69 def killAllJobs( self, qname, auth = 'netschedule_admin' ): 70 " removes all the jobs from the queue " 71 72 cmdLine = [ "__grid_cli", "kill", "--all-jobs", 73 "--queue=" + qname, 74 "--ns=" + self.__host + ":" + self.__port ] 75 if auth != "": 76 cmdLine += [ "--auth=" + auth, "--compat-mode" ] 77 78 self.__printCmd( cmdLine ) 79 safeRun( cmdLine ) 80 return 81 82 def submitBatch( self, qname, jobs ): 83 " Performs a batch submit " 84 if len( jobs ) <= 0: 85 raise Exception( "Jobs list expected" ) 86 87 # Save the jobs in a temporary file 88 f, filename = tempfile.mkstemp() 89 for job in jobs: 90 os.write( f, "input=" + job + "\n" ) 91 os.close( f ) 92 93 try: 94 # Execute the command 95 cmdLine = [ "grid_cli", "submitjob", 96 "--batch=" + str( len(jobs) ), 97 "--input-file=" + filename, 98 "--queue=" + qname, 99 "--ns=" + self.__host + ":" + self.__port ] 100 self.__printCmd( cmdLine ) 101 jobids = safeRun( cmdLine ) 102 103 except: 104 # Remove the temporary file 105 os.unlink( filename ) 106 raise 107 108 # Remove the temporary file 109 os.unlink( filename ) 110 111 ids = [] 112 for jobid in jobids.split( '\n' ): 113 jobid = jobid.strip() 114 if jobid != "": 115 ids.append( jobid ) 116 return ids 117 118 def getJob( self, qname, aff = '' ): 119 " Get a job for execution " 120 121 cmdLine = [ "grid_cli", "requestjob", 122 "--queue=" + qname, 123 "--ns=" + self.__host + ":" + self.__port ] 124 if aff != '': 125 cmdLine += [ "--affinity=" + aff ] 126 127 self.__printCmd( cmdLine ) 128 output = safeRun( cmdLine ).split( '\n' ) 129 130 if len( output ) == 0: 131 return "" 132 return output[ 0 ].strip() # This is JOB ID 133 134 def commitJob( self, qname, jobID, retCode, out = "" ): 135 " Commits the job " 136 137 cmdLine = [ "grid_cli", "commitjob", 138 "--queue=" + qname, jobID, 139 "--return-code=" + str( retCode ), 140 "--ns=" + self.__host + ":" + self.__port ] 141 if out != "": 142 cmdLine += [ "--job-output=" + out ] 143 144 self.__printCmd( cmdLine ) 145 return safeRun( cmdLine ) 146 147 def readJobs( self, qname, count ): 148 " Get jobs for reading " 149 150 cmdLine = [ "grid_cli", "readjobs", 151 "--limit=" + str( count ), 152 "--queue=" + qname, 153 "--ns=" + self.__host + ":" + self.__port ] 154 155 self.__printCmd( cmdLine ) 156 groupID = -1 157 jobs = [] 158 for line in safeRun( cmdLine ).split( '\n' ): 159 line = line.strip() 160 if line == "": 161 continue 162 if groupID == -1: 163 groupID = int( line ) 164 continue 165 jobs.append( line ) 166 return groupID, jobs 167 168 def confirmJobRead( self, qname, groupID, jobs ): 169 " Confirms the fact that jobs have been read " 170 if len( jobs ) <= 0: 171 raise Exception( "Jobs list expected" ) 172 173 cmdLine = [ "grid_cli", "readjobs", 174 "--confirm-read=" + str( groupID ), 175 "--queue=" + qname, 176 "--ns=" + self.__host + ":" + self.__port ] 177 for jobid in jobs: 178 cmdLine.append( "--job-id=" + jobid ) 179 180 self.__printCmd( cmdLine ) 181 return safeRun( cmdLine ) 182 183 184 185 186def safeRun( commandArgs ): 187 " Provides the process stdout " 188 stdOut, stdErr = safeRunWithStderr( commandArgs ) 189 return stdOut 190 191 192def safeRunWithStderr( commandArgs ): 193 " Runs the given command and provides both stdout and stderr " 194 195 errTmp = tempfile.mkstemp() 196 errStream = os.fdopen( errTmp[ 0 ] ) 197 process = Popen( commandArgs, stdin = PIPE, 198 stdout = PIPE, stderr = errStream ) 199 process.stdin.close() 200 processStdout = process.stdout.read() 201 process.stdout.close() 202 errStream.seek( 0 ) 203 err = errStream.read() 204 errStream.close() 205 process.wait() 206 try: 207 # On WinXX the file might still be kept and unlink generates exception 208 os.unlink( errTmp[ 1 ] ) 209 except: 210 pass 211 212 # 'grep' return codes: 213 # 0 - OK, lines found 214 # 1 - OK, no lines found 215 # 2 - Error occured 216 217 if process.returncode == 0 or \ 218 ( os.path.basename( commandArgs[ 0 ] ) == "grep" and \ 219 process.returncode == 1 ): 220 # No problems, the ret code is 0 or the grep special case 221 return processStdout, err.strip() 222 223 # A problem has been identified 224 raise Exception( "Error in '%s' invocation: %s" % \ 225 (commandArgs[0], err) ) 226 227