1#!/usr/local/bin/python3.8
2#
3# Authors: Sergey Satskiy
4#
5# $Id: libgridclient.py 309785 2011-06-28 13:46:22Z satskyse $
6#
7
8
9"""
10grid_cli utility python wrapper
11"""
12
13import os, tempfile
14from subprocess import Popen, PIPE
15
16
17
18class GridClient:
19    " Wrapper for grid_cli "
20
21    def __init__( self, host, port, verbose = False ):
22        self.__host = host
23        self.__port = str( port )
24        self.__verbose = verbose
25
26        # Check for grid_cli
27        if os.system( "grid_cli help > /dev/null 2>&1" ) != 0:
28            raise Exception( "Cannot find grid_cli available via PATH " )
29        return
30
31    def __printCmd( self, cmd ):
32        " Handy function to print the command if needed "
33        if self.__verbose:
34            print "Executing command: " + " ".join( cmd )
35
36    def submitJob( self, qname, jobinput, affinity = "",
37                   tags = "", progress_msg = "", exclusive = False ):
38        " Submit a job and provide the submitted job key "
39
40        cmdLine = [ "grid_cli", "submitjob",
41                    "--queue=" + qname,
42                    "--input=" + jobinput,
43                    "--ns=" + self.__host + ":" + self.__port ]
44        if affinity != "":
45            cmdLine += [ "--affinity=" + affinity ]
46        if tags != "":
47            cmdLine += [ "--job-tag=" + tags ]
48        if progress_msg != "":
49            cmdLine += [ "--progress-message=" + progress_msg ]
50        if exclusive:
51            cmdLine += [ "--exclusive-job" ]
52
53        self.__printCmd( cmdLine )
54        return safeRun( cmdLine ).strip()
55
56    def killJob( self, qname, jobKey, auth = 'netschedule_admin' ):
57        " removes a job from the queue "
58
59        cmdLine = [ "grid_cli", "kill", jobKey,
60                    "--queue=" + qname,
61                    "--ns=" + self.__host + ":" + self.__port ]
62        if auth != "":
63            cmdLine += [ "--auth=" + auth ]
64
65        self.__printCmd( cmdLine )
66        safeRun( cmdLine )
67        return
68
69    def killAllJobs( self, qname, auth = 'netschedule_admin' ):
70        " removes all the jobs from the queue "
71
72        cmdLine = [ "__grid_cli", "kill", "--all-jobs",
73                    "--queue=" + qname,
74                    "--ns=" + self.__host + ":" + self.__port ]
75        if auth != "":
76            cmdLine += [ "--auth=" + auth, "--compat-mode" ]
77
78        self.__printCmd( cmdLine )
79        safeRun( cmdLine )
80        return
81
82    def submitBatch( self, qname, jobs ):
83        " Performs a batch submit "
84        if len( jobs ) <= 0:
85            raise Exception( "Jobs list expected" )
86
87        # Save the jobs in a temporary file
88        f, filename = tempfile.mkstemp()
89        for job in jobs:
90            os.write( f, "input=" + job + "\n" )
91        os.close( f )
92
93        try:
94            # Execute the command
95            cmdLine = [ "grid_cli", "submitjob",
96                        "--batch=" + str( len(jobs) ),
97                        "--input-file=" + filename,
98                        "--queue=" + qname,
99                        "--ns=" + self.__host + ":" + self.__port ]
100            self.__printCmd( cmdLine )
101            jobids = safeRun( cmdLine )
102
103        except:
104            # Remove the temporary file
105            os.unlink( filename )
106            raise
107
108        # Remove the temporary file
109        os.unlink( filename )
110
111        ids = []
112        for jobid in jobids.split( '\n' ):
113            jobid = jobid.strip()
114            if jobid != "":
115                ids.append( jobid )
116        return ids
117
118    def getJob( self, qname, aff = '' ):
119        " Get a job for execution "
120
121        cmdLine = [ "grid_cli", "requestjob",
122                    "--queue=" + qname,
123                    "--ns=" + self.__host + ":" + self.__port ]
124        if aff != '':
125            cmdLine += [ "--affinity=" + aff ]
126
127        self.__printCmd( cmdLine )
128        output = safeRun( cmdLine ).split( '\n' )
129
130        if len( output ) == 0:
131            return ""
132        return output[ 0 ].strip()      # This is JOB ID
133
134    def commitJob( self, qname, jobID, retCode, out = "" ):
135        " Commits the job "
136
137        cmdLine = [ "grid_cli", "commitjob",
138                    "--queue=" + qname, jobID,
139                    "--return-code=" + str( retCode ),
140                    "--ns=" + self.__host + ":" + self.__port ]
141        if out != "":
142            cmdLine += [ "--job-output=" + out ]
143
144        self.__printCmd( cmdLine )
145        return safeRun( cmdLine )
146
147    def readJobs( self, qname, count ):
148        " Get jobs for reading "
149
150        cmdLine = [ "grid_cli", "readjobs",
151                    "--limit=" + str( count ),
152                    "--queue=" + qname,
153                    "--ns=" + self.__host + ":" + self.__port ]
154
155        self.__printCmd( cmdLine )
156        groupID = -1
157        jobs = []
158        for line in safeRun( cmdLine ).split( '\n' ):
159            line = line.strip()
160            if line == "":
161                continue
162            if groupID == -1:
163                groupID = int( line )
164                continue
165            jobs.append( line )
166        return groupID, jobs
167
168    def confirmJobRead( self, qname, groupID, jobs ):
169        " Confirms the fact that jobs have been read "
170        if len( jobs ) <= 0:
171            raise Exception( "Jobs list expected" )
172
173        cmdLine = [ "grid_cli", "readjobs",
174                    "--confirm-read=" + str( groupID ),
175                    "--queue=" + qname,
176                    "--ns=" + self.__host + ":" + self.__port ]
177        for jobid in jobs:
178            cmdLine.append( "--job-id=" + jobid )
179
180        self.__printCmd( cmdLine )
181        return safeRun( cmdLine )
182
183
184
185
186def safeRun( commandArgs ):
187    " Provides the process stdout "
188    stdOut, stdErr = safeRunWithStderr( commandArgs )
189    return stdOut
190
191
192def safeRunWithStderr( commandArgs ):
193    " Runs the given command and provides both stdout and stderr "
194
195    errTmp = tempfile.mkstemp()
196    errStream = os.fdopen( errTmp[ 0 ] )
197    process = Popen( commandArgs, stdin = PIPE,
198                     stdout = PIPE, stderr = errStream )
199    process.stdin.close()
200    processStdout = process.stdout.read()
201    process.stdout.close()
202    errStream.seek( 0 )
203    err = errStream.read()
204    errStream.close()
205    process.wait()
206    try:
207        # On WinXX the file might still be kept and unlink generates exception
208        os.unlink( errTmp[ 1 ] )
209    except:
210        pass
211
212    # 'grep' return codes:
213    # 0 - OK, lines found
214    # 1 - OK, no lines found
215    # 2 - Error occured
216
217    if process.returncode == 0 or \
218       ( os.path.basename( commandArgs[ 0 ] ) == "grep" and \
219         process.returncode == 1 ):
220        # No problems, the ret code is 0 or the grep special case
221        return processStdout, err.strip()
222
223    # A problem has been identified
224    raise Exception( "Error in '%s' invocation: %s" % \
225                     (commandArgs[0], err) )
226
227