1##############################################################################
2# Medical Image Registration ToolKit (MIRTK)
3#
4# Copyright 2017 Imperial College London
5# Copyright 2017 Andreas Schuh
6#
7# Licensed under the Apache License, Version 2.0 (the "License");
8# you may not use this file except in compliance with the License.
9# You may obtain a copy of the License at
10#
11#     http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS,
15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18##############################################################################
19
20"""Auxiliary functions for batch execution of MIRTK commands using HTCondor."""
21
22import re
23import os
24import sys
25import subprocess
26import time
27import datetime
28from xml.etree import ElementTree
29import mirtk.utils
30
31
32# ----------------------------------------------------------------------------
33def submit(name, command=None, args=[], opts={}, script=None, tasks=0, deps=[],
34           threads=0, memory=8 * 1024, retries=5, requirements=[], environment={},
35           logdir=None, log=None, workdir=None, verbose=1):
36    """Submit batch job to HTCondor."""
37    if deps:
38        raise NotImplementedError("Cannot submit individual HTCondor jobs with dependencies yet, this requires use of DAGMan")
39    if logdir or log:
40        if not logdir:
41            log = os.path.abspath(log)
42            logdir = os.path.dirname(log)
43        elif not log:
44            logdir = os.path.abspath(logdir)
45            if tasks > 0:
46                log = os.path.join(logdir, name + "_$(Cluster).$(Process).log")
47            else:
48                log = os.path.join(logdir, name + "_$(Cluster).log")
49        mirtk.utils.makedirs(logdir)
50    jobdesc = "universe = vanilla\n"
51    if logdir:
52        jobdesc += "log = {0}\n".format(os.path.join(logdir, name + ".condor.log"))
53    if threads > 0:
54        jobdesc += "request_cpus = {0}\n".format(threads)
55    if memory > 0:
56        jobdesc += "request_memory = {0}\n".format(memory)
57    if requirements:
58        jobdesc += "requirements = " + " && ".join(requirements) + "\n"
59    if environment:
60        jobdesc += "environment = \""
61        for envname, envval in environment.items():
62            jobdesc += " {0}='{1}'".format(envname, ':'.join(envval) if isinstance(envval, (list, tuple)) else envval)
63        jobdesc += "\"\n"
64    if workdir:
65        jobdesc += "initialdir = {0}\n".format(os.path.abspath(workdir))
66    # Note: MIRTK executables return exit code 6 when memory allocation fails, other codes are kill/term signals
67    jobdesc += "on_exit_remove = (ExitBySignal == False && ExitCode != 6 && ExitCode != 247 && ExitCode != 241) || (ExitBySignal == True && ExitSignal != 9 && ExitSignal != 15)\n"
68    if retries > 0:
69        jobdesc += "max_retries = {0}\n".format(retries)
70    if script:
71        if command:
72            raise ValueError("Keyword arguments 'command' and 'script' are mutually exclusive")
73        if not log:
74            raise ValueError("Script submission of batch to HTCondor requires log path for script file!")
75        script_path = os.path.join(logdir, name + ".py")
76        with open(script_path, "wt") as f:
77            f.write(script.format(**opts))
78        jobdesc += "executable = {0}\n".format(sys.executable)
79        jobdesc += "arguments = \"'{0}'".format(script_path)
80        if tasks > 0:
81            jobdesc += " $(Process)"
82        jobdesc += "\"\n"
83        if log:
84            jobdesc += "output = {0}\n".format(log)
85            jobdesc += "error = {0}\n".format(log)
86        jobdesc += "queue"
87        if tasks > 0:
88            jobdesc += " {}".format(tasks)
89        jobdesc += "\n"
90    else:
91        jobdesc += "executable = {0}\n".format(sys.executable)
92        jobdesc += "arguments = \"-c 'import sys; import socket;"
93        jobdesc += " sys.stdout.write(\"\"Host: \"\" + socket.gethostname() + \"\"\\n\\n\"\");"
94        jobdesc += " sys.path.insert(0, \"\"{0}\"\");".format(os.path.dirname(os.path.dirname(__file__)))
95        jobdesc += " import mirtk; mirtk.check_call([\"\"{0}\"\"] + sys.argv[1:])'".format(command if command else name)
96        for arg in args:
97            arg = str(arg)
98            if ' ' in arg:
99                arg = "'" + arg + "'"
100            jobdesc += ' ' + arg
101        for opt in opts:
102            arg = opts[opt]
103            if opt[0] != '-':
104                opt = '-' + opt
105            jobdesc += ' ' + opt
106            if arg is not None:
107                if isinstance(arg, (list, tuple)):
108                    arg = ' '.join([str(x) for x in arg])
109                else:
110                    arg = str(arg)
111                jobdesc += ' ' + arg
112        jobdesc += "\"\n"
113        if log:
114            jobdesc += "output = {0}\n".format(log)
115            jobdesc += "error = {0}\n".format(log)
116        jobdesc += "queue\n"
117    proc = subprocess.Popen(["condor_submit", "-"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
118    (out, err) = proc.communicate(input=jobdesc.encode('utf-8'))
119    if proc.returncode != 0:
120        raise Exception(err)
121    match = re.search('[0-9]+ job(s) submitted to cluster ([0-9]+)\.', out)
122    if not match:
123        match = re.search('\*\* Proc ([0-9]+)(\.[0-9]+)?:', out)
124        if not match:
125            raise Exception("Failed to determine job ID from condor_submit output:\n" + out)
126    jobid = int(match.group(1))
127    if verbose > 0:
128        if tasks > 0:
129            print("  Submitted batch {} (JobId={}, Tasks={})".format(name, jobid, tasks))
130        else:
131            print("  Submitted job {} (JobId={})".format(name, jobid))
132    return jobid if tasks == 0 else (jobid, tasks)
133
134
135# ----------------------------------------------------------------------------
136def _parse_condor_xml(s):
137    """Parse XML ClassAds returned by condor_q or condor_history with -xml option."""
138    # Note: condor_history may return multiple (only one non-empty) <classads> tags
139    end = 0
140    classads = None
141    starttag = "<classads>"
142    endtag = "</classads>"
143    while True:
144        start = s.find(starttag, end)
145        if start == -1:
146            break
147        end = s.find(endtag, start)
148        if end == -1:
149            raise ValueError("Malformed <classads> XML, could not find matching </classads>!")
150        end += len(endtag)
151        elem = ElementTree.fromstring(s[start:end])
152        if classads is None:
153            classads = elem
154        else:
155            classads.extend(elem)
156    return classads
157
158
159# ----------------------------------------------------------------------------
160def wait(jobs, max_time=0, max_error=5, interval=60, verbose=0):
161    if not isinstance(jobs, list):
162        jobs = [jobs]
163    clusters = []
164    for job in jobs:
165        if isinstance(job, tuple):
166            cluid = job[0]
167            tasks = job[1]
168        else:
169            cluid = job
170            tasks = 1
171        if cluid > 0:
172            clusters.append((cluid, tasks))
173    num_wait = len(clusters)
174    num_error = 0
175    total_time = 0
176    iterations = 0
177    while num_wait > 0 and (max_time <= 0 or total_time < max_time or total_time == 0) and num_error < max_error:
178        time.sleep(interval)
179        total_time += interval
180        try:
181            num_pending = 0
182            num_running = 0
183            num_suspended = 0
184            num_held = 0
185            num_done = 0
186            for cluster, tasks in clusters:
187                classads = subprocess.check_output(["condor_q", "-xml", str(cluster)], stderr=subprocess.STDOUT)
188                classads = _parse_condor_xml(classads)
189                for process in range(tasks):
190                    classad = classads.find(".c/a[@n='ClusterId'][i='{0}']/../a[@n='ProcId'][i='{1}']/..".format(cluster, process))
191                    if classad is None:
192                        num_done += 1
193                    else:
194                        status = int(classad.find("a[@n='JobStatus']/i").text)
195                        # 1) Idle
196                        # 2) Running
197                        # 3) Removed
198                        # 4) Completed (also when failed, check ExitCode afterwards using condor_history)
199                        # 5) Held
200                        # 6) Transferring Output
201                        # 7) Suspended
202                        if status == 1:
203                            num_pending += 1
204                        elif status == 2 or status == 6:
205                            num_running += 1
206                        elif status == 3:
207                            num_done += 1
208                        elif status == 4:
209                            num_done += 1
210                        elif status == 5:
211                            num_held += 1
212                        elif status == 7:
213                            num_suspended += 1
214                        else:
215                            raise Exception("Unknown job status: {}".format(status))
216            num_wait = num_running + num_pending + num_held + num_suspended
217            if verbose > 0 and (num_wait <= 0 or iterations % verbose == 0):
218                sys.stdout.write("{:%Y-%b-%d %H:%M:%S}".format(datetime.datetime.now()))
219                sys.stdout.write(" WAIT {p} pending, {r} running, {s} suspended, {h} held, {d} completed\n".format(
220                    p=num_pending, r=num_running, s=num_suspended, h=num_held, d=num_done
221                ))
222                sys.stdout.flush()
223            num_error = 0
224        except subprocess.CalledProcessError:
225            sys.stdout.write("{:%Y-%b-%d %H:%M:%S}".format(datetime.datetime.now()))
226            sys.stdout.write(" WAIT Failed to retrieve job status, will retry {0} more times!\n".format(max_error - num_error))
227            sys.stdout.flush()
228            num_error += 1
229        iterations += 1
230    if num_error >= max_error:
231        raise Exception("Exceeded maximum number of retries to query status of jobs!")
232    if num_wait > 0 and max_time > 0 and total_time >= max_time:
233        raise Exception("Exceeded maximum time waiting for jobs to complete!")
234    num_fail = 0
235    if total_time > 0:
236        time.sleep(10)
237        num_jobs = 0
238        unknown = {}
239        for cluster, tasks in clusters:
240            num_jobs += tasks
241            unknown[cluster] = [0] * tasks
242        num_error = 0
243        num_unknown = num_jobs
244        while num_unknown > 0 and num_error <= max_error:
245            try:
246                num_fail = 0
247                for cluster, tasks in clusters:
248                    classads = _parse_condor_xml(subprocess.check_output(["condor_history", "-xml", str(cluster)]))
249                    for process in range(tasks):
250                        classad = classads.find(".c/a[@n='ClusterId'][i='{0}']/../a[@n='ProcId'][i='{1}']/..".format(cluster, process))
251                        if classad is None:
252                            unknown[cluster][process] += 1
253                        else:
254                            unknown[cluster][process] = 0
255                            status = int(classad.find("a[@n='JobStatus']/i").text)
256                            exit_code = int(classad.find("a[@n='ExitCode']/i").text)
257                            if status != 4 or exit_code != 0:
258                                num_fail += 1
259                num_unknown = 0
260                for cluster, tasks in clusters:
261                    for process in range(tasks):
262                        if unknown[cluster][process] > 0:
263                            if unknown[cluster][process] > max_error:
264                                raise Exception("Could not retrieve exit code of job {}.{} for the past {} attempts!".format(cluster, process, unknown[cluster][process]))
265                            num_unknown += 1
266                if verbose > 0:
267                    sys.stdout.write("{:%Y-%b-%d %H:%M:%S}".format(datetime.datetime.now()))
268                    sys.stdout.write(" DONE {0} succeeded, {1} failed".format(num_jobs - num_fail - num_unknown, num_fail))
269                    if num_unknown > 0:
270                        sys.stdout.write(", {} unknown, will retry".format(num_unknown))
271                    sys.stdout.write("\n")
272                    sys.stdout.flush()
273            except subprocess.CalledProcessError:
274                sys.stdout.write("{:%Y-%b-%d %H:%M:%S}".format(datetime.datetime.now()))
275                sys.stdout.write(" WAIT Failed to retrieve exit codes, will retry {0} more times!\n".format(max_error - num_error))
276                sys.stdout.flush()
277                num_error += 1
278        if num_error >= max_error:
279            raise Exception("Exceeded maximum number of retries to query exit codes of jobs!")
280    return num_fail == 0
281