1############################################################################## 2# Medical Image Registration ToolKit (MIRTK) 3# 4# Copyright 2017 Imperial College London 5# Copyright 2017 Andreas Schuh 6# 7# Licensed under the Apache License, Version 2.0 (the "License"); 8# you may not use this file except in compliance with the License. 9# You may obtain a copy of the License at 10# 11# http://www.apache.org/licenses/LICENSE-2.0 12# 13# Unless required by applicable law or agreed to in writing, software 14# distributed under the License is distributed on an "AS IS" BASIS, 15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16# See the License for the specific language governing permissions and 17# limitations under the License. 18############################################################################## 19 20"""Auxiliary functions for batch execution of MIRTK commands using HTCondor.""" 21 22import re 23import os 24import sys 25import subprocess 26import time 27import datetime 28from xml.etree import ElementTree 29import mirtk.utils 30 31 32# ---------------------------------------------------------------------------- 33def submit(name, command=None, args=[], opts={}, script=None, tasks=0, deps=[], 34 threads=0, memory=8 * 1024, retries=5, requirements=[], environment={}, 35 logdir=None, log=None, workdir=None, verbose=1): 36 """Submit batch job to HTCondor.""" 37 if deps: 38 raise NotImplementedError("Cannot submit individual HTCondor jobs with dependencies yet, this requires use of DAGMan") 39 if logdir or log: 40 if not logdir: 41 log = os.path.abspath(log) 42 logdir = os.path.dirname(log) 43 elif not log: 44 logdir = os.path.abspath(logdir) 45 if tasks > 0: 46 log = os.path.join(logdir, name + "_$(Cluster).$(Process).log") 47 else: 48 log = os.path.join(logdir, name + "_$(Cluster).log") 49 mirtk.utils.makedirs(logdir) 50 jobdesc = "universe = vanilla\n" 51 if logdir: 52 jobdesc += "log = {0}\n".format(os.path.join(logdir, name + ".condor.log")) 53 if threads > 0: 54 jobdesc += "request_cpus = {0}\n".format(threads) 55 if memory > 0: 56 jobdesc += "request_memory = {0}\n".format(memory) 57 if requirements: 58 jobdesc += "requirements = " + " && ".join(requirements) + "\n" 59 if environment: 60 jobdesc += "environment = \"" 61 for envname, envval in environment.items(): 62 jobdesc += " {0}='{1}'".format(envname, ':'.join(envval) if isinstance(envval, (list, tuple)) else envval) 63 jobdesc += "\"\n" 64 if workdir: 65 jobdesc += "initialdir = {0}\n".format(os.path.abspath(workdir)) 66 # Note: MIRTK executables return exit code 6 when memory allocation fails, other codes are kill/term signals 67 jobdesc += "on_exit_remove = (ExitBySignal == False && ExitCode != 6 && ExitCode != 247 && ExitCode != 241) || (ExitBySignal == True && ExitSignal != 9 && ExitSignal != 15)\n" 68 if retries > 0: 69 jobdesc += "max_retries = {0}\n".format(retries) 70 if script: 71 if command: 72 raise ValueError("Keyword arguments 'command' and 'script' are mutually exclusive") 73 if not log: 74 raise ValueError("Script submission of batch to HTCondor requires log path for script file!") 75 script_path = os.path.join(logdir, name + ".py") 76 with open(script_path, "wt") as f: 77 f.write(script.format(**opts)) 78 jobdesc += "executable = {0}\n".format(sys.executable) 79 jobdesc += "arguments = \"'{0}'".format(script_path) 80 if tasks > 0: 81 jobdesc += " $(Process)" 82 jobdesc += "\"\n" 83 if log: 84 jobdesc += "output = {0}\n".format(log) 85 jobdesc += "error = {0}\n".format(log) 86 jobdesc += "queue" 87 if tasks > 0: 88 jobdesc += " {}".format(tasks) 89 jobdesc += "\n" 90 else: 91 jobdesc += "executable = {0}\n".format(sys.executable) 92 jobdesc += "arguments = \"-c 'import sys; import socket;" 93 jobdesc += " sys.stdout.write(\"\"Host: \"\" + socket.gethostname() + \"\"\\n\\n\"\");" 94 jobdesc += " sys.path.insert(0, \"\"{0}\"\");".format(os.path.dirname(os.path.dirname(__file__))) 95 jobdesc += " import mirtk; mirtk.check_call([\"\"{0}\"\"] + sys.argv[1:])'".format(command if command else name) 96 for arg in args: 97 arg = str(arg) 98 if ' ' in arg: 99 arg = "'" + arg + "'" 100 jobdesc += ' ' + arg 101 for opt in opts: 102 arg = opts[opt] 103 if opt[0] != '-': 104 opt = '-' + opt 105 jobdesc += ' ' + opt 106 if arg is not None: 107 if isinstance(arg, (list, tuple)): 108 arg = ' '.join([str(x) for x in arg]) 109 else: 110 arg = str(arg) 111 jobdesc += ' ' + arg 112 jobdesc += "\"\n" 113 if log: 114 jobdesc += "output = {0}\n".format(log) 115 jobdesc += "error = {0}\n".format(log) 116 jobdesc += "queue\n" 117 proc = subprocess.Popen(["condor_submit", "-"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) 118 (out, err) = proc.communicate(input=jobdesc.encode('utf-8')) 119 if proc.returncode != 0: 120 raise Exception(err) 121 match = re.search('[0-9]+ job(s) submitted to cluster ([0-9]+)\.', out) 122 if not match: 123 match = re.search('\*\* Proc ([0-9]+)(\.[0-9]+)?:', out) 124 if not match: 125 raise Exception("Failed to determine job ID from condor_submit output:\n" + out) 126 jobid = int(match.group(1)) 127 if verbose > 0: 128 if tasks > 0: 129 print(" Submitted batch {} (JobId={}, Tasks={})".format(name, jobid, tasks)) 130 else: 131 print(" Submitted job {} (JobId={})".format(name, jobid)) 132 return jobid if tasks == 0 else (jobid, tasks) 133 134 135# ---------------------------------------------------------------------------- 136def _parse_condor_xml(s): 137 """Parse XML ClassAds returned by condor_q or condor_history with -xml option.""" 138 # Note: condor_history may return multiple (only one non-empty) <classads> tags 139 end = 0 140 classads = None 141 starttag = "<classads>" 142 endtag = "</classads>" 143 while True: 144 start = s.find(starttag, end) 145 if start == -1: 146 break 147 end = s.find(endtag, start) 148 if end == -1: 149 raise ValueError("Malformed <classads> XML, could not find matching </classads>!") 150 end += len(endtag) 151 elem = ElementTree.fromstring(s[start:end]) 152 if classads is None: 153 classads = elem 154 else: 155 classads.extend(elem) 156 return classads 157 158 159# ---------------------------------------------------------------------------- 160def wait(jobs, max_time=0, max_error=5, interval=60, verbose=0): 161 if not isinstance(jobs, list): 162 jobs = [jobs] 163 clusters = [] 164 for job in jobs: 165 if isinstance(job, tuple): 166 cluid = job[0] 167 tasks = job[1] 168 else: 169 cluid = job 170 tasks = 1 171 if cluid > 0: 172 clusters.append((cluid, tasks)) 173 num_wait = len(clusters) 174 num_error = 0 175 total_time = 0 176 iterations = 0 177 while num_wait > 0 and (max_time <= 0 or total_time < max_time or total_time == 0) and num_error < max_error: 178 time.sleep(interval) 179 total_time += interval 180 try: 181 num_pending = 0 182 num_running = 0 183 num_suspended = 0 184 num_held = 0 185 num_done = 0 186 for cluster, tasks in clusters: 187 classads = subprocess.check_output(["condor_q", "-xml", str(cluster)], stderr=subprocess.STDOUT) 188 classads = _parse_condor_xml(classads) 189 for process in range(tasks): 190 classad = classads.find(".c/a[@n='ClusterId'][i='{0}']/../a[@n='ProcId'][i='{1}']/..".format(cluster, process)) 191 if classad is None: 192 num_done += 1 193 else: 194 status = int(classad.find("a[@n='JobStatus']/i").text) 195 # 1) Idle 196 # 2) Running 197 # 3) Removed 198 # 4) Completed (also when failed, check ExitCode afterwards using condor_history) 199 # 5) Held 200 # 6) Transferring Output 201 # 7) Suspended 202 if status == 1: 203 num_pending += 1 204 elif status == 2 or status == 6: 205 num_running += 1 206 elif status == 3: 207 num_done += 1 208 elif status == 4: 209 num_done += 1 210 elif status == 5: 211 num_held += 1 212 elif status == 7: 213 num_suspended += 1 214 else: 215 raise Exception("Unknown job status: {}".format(status)) 216 num_wait = num_running + num_pending + num_held + num_suspended 217 if verbose > 0 and (num_wait <= 0 or iterations % verbose == 0): 218 sys.stdout.write("{:%Y-%b-%d %H:%M:%S}".format(datetime.datetime.now())) 219 sys.stdout.write(" WAIT {p} pending, {r} running, {s} suspended, {h} held, {d} completed\n".format( 220 p=num_pending, r=num_running, s=num_suspended, h=num_held, d=num_done 221 )) 222 sys.stdout.flush() 223 num_error = 0 224 except subprocess.CalledProcessError: 225 sys.stdout.write("{:%Y-%b-%d %H:%M:%S}".format(datetime.datetime.now())) 226 sys.stdout.write(" WAIT Failed to retrieve job status, will retry {0} more times!\n".format(max_error - num_error)) 227 sys.stdout.flush() 228 num_error += 1 229 iterations += 1 230 if num_error >= max_error: 231 raise Exception("Exceeded maximum number of retries to query status of jobs!") 232 if num_wait > 0 and max_time > 0 and total_time >= max_time: 233 raise Exception("Exceeded maximum time waiting for jobs to complete!") 234 num_fail = 0 235 if total_time > 0: 236 time.sleep(10) 237 num_jobs = 0 238 unknown = {} 239 for cluster, tasks in clusters: 240 num_jobs += tasks 241 unknown[cluster] = [0] * tasks 242 num_error = 0 243 num_unknown = num_jobs 244 while num_unknown > 0 and num_error <= max_error: 245 try: 246 num_fail = 0 247 for cluster, tasks in clusters: 248 classads = _parse_condor_xml(subprocess.check_output(["condor_history", "-xml", str(cluster)])) 249 for process in range(tasks): 250 classad = classads.find(".c/a[@n='ClusterId'][i='{0}']/../a[@n='ProcId'][i='{1}']/..".format(cluster, process)) 251 if classad is None: 252 unknown[cluster][process] += 1 253 else: 254 unknown[cluster][process] = 0 255 status = int(classad.find("a[@n='JobStatus']/i").text) 256 exit_code = int(classad.find("a[@n='ExitCode']/i").text) 257 if status != 4 or exit_code != 0: 258 num_fail += 1 259 num_unknown = 0 260 for cluster, tasks in clusters: 261 for process in range(tasks): 262 if unknown[cluster][process] > 0: 263 if unknown[cluster][process] > max_error: 264 raise Exception("Could not retrieve exit code of job {}.{} for the past {} attempts!".format(cluster, process, unknown[cluster][process])) 265 num_unknown += 1 266 if verbose > 0: 267 sys.stdout.write("{:%Y-%b-%d %H:%M:%S}".format(datetime.datetime.now())) 268 sys.stdout.write(" DONE {0} succeeded, {1} failed".format(num_jobs - num_fail - num_unknown, num_fail)) 269 if num_unknown > 0: 270 sys.stdout.write(", {} unknown, will retry".format(num_unknown)) 271 sys.stdout.write("\n") 272 sys.stdout.flush() 273 except subprocess.CalledProcessError: 274 sys.stdout.write("{:%Y-%b-%d %H:%M:%S}".format(datetime.datetime.now())) 275 sys.stdout.write(" WAIT Failed to retrieve exit codes, will retry {0} more times!\n".format(max_error - num_error)) 276 sys.stdout.flush() 277 num_error += 1 278 if num_error >= max_error: 279 raise Exception("Exceeded maximum number of retries to query exit codes of jobs!") 280 return num_fail == 0 281