1# coding: utf-8
2"""
3Objects and methods to contact the resource manager to get info on the status of the job and useful statistics.
4Note that this is not a wrapper for the C API but a collection of simple wrappers around the shell commands
5provided by the resource manager  (qsub, qdel and qstat for PBS, sinfo, squeue... for Slurm).
6The main goal indeed is providing a simplified common interface for different resource managers without
7having to rely on external libraries.
8"""
9
10import shlex
11
12from collections import OrderedDict, defaultdict
13from subprocess import Popen, PIPE
14from monty.collections import AttrDict
15from monty.inspect import all_subclasses
16
17import logging
18logger = logging.getLogger(__name__)
19
20
21class JobStatus(int):
22    """
23    This object is an integer representing the status of a :class:`QueueJob`.
24
25    Slurm API, see `man squeue`.
26
27    JOB STATE CODES
28       Jobs typically pass through several states in the course of their execution. The typical states are
29       PENDING, RUNNING, SUSPENDED, COMPLETING, and COMPLETED. An explanation of each state follows::
30
31        BF  BOOT_FAIL       Job  terminated  due  to launch failure, typically due to a hardware failure (e.g.
32                            unable to boot the node or block and the job can not be requeued).
33        CA  CANCELLED       Job was explicitly cancelled by the user or system administrator.
34                            The job may or may not have been initiated.
35        CD  COMPLETED       Job has terminated all processes on all nodes.
36        CF  CONFIGURING     Job has been allocated resources, but are waiting for them to become ready for use (e.g. booting).
37        CG  COMPLETING      Job is in the process of completing. Some processes on some  nodes may still be active.
38        F   FAILED          Job terminated with non-zero exit code or other failure condition.
39        NF  NODE_FAIL       Job terminated due to failure of one or more allocated nodes.
40        PD  PENDING         Job is awaiting resource allocation.
41        PR  PREEMPTED       Job terminated due to preemption.
42        R   RUNNING         Job currently has an allocation.
43        S   SUSPENDED       Job has an allocation, but execution has been suspended.
44        TO  TIMEOUT         Job terminated upon reaching its time limit.
45        SE SPECIAL_EXIT     The job was requeued in a special state. This state can be set by users, typically
46                            in EpilogSlurmctld, if the job has terminated with a particular exit value.
47
48    """
49
50    _STATUS_TABLE = OrderedDict([
51        (-1, "UNKNOWN"),
52        (0, "PENDING"),
53        (1, "RUNNING"),
54        (2, "RESIZING"),
55        (3, "SUSPENDED"),
56        (4, "COMPLETED"),
57        (5, "CANCELLED"),
58        (6, "FAILED"),
59        (7, "TIMEOUT"),
60        (8, "PREEMPTED"),
61        (9, "NODEFAIL"),
62    ])
63
64    def __repr__(self):
65        return "<%s: %s, at %s>" % (self.__class__.__name__, str(self), id(self))
66
67    def __str__(self):
68        """String representation."""
69        return self._STATUS_TABLE[self]
70
71    @classmethod
72    def from_string(cls, s):
73        """Return a :class:`JobStatus` instance from its string representation."""
74        for num, text in cls._STATUS_TABLE.items():
75            if text == s: return cls(num)
76        else:
77            #raise ValueError("Wrong string %s" % s)
78            logger.warning("Got unknown status: %s" % s)
79            return cls.from_string("UNKNOWN")
80
81
82class QueueJob(object):
83    """
84    This object provides methods to contact the resource manager to get info on the status
85    of the job and useful statistics. This is an abstract class.
86    """
87    QTYPE = None
88
89    # Used to handle other resource managers.
90    S_UNKNOWN = JobStatus.from_string("UNKNOWN")
91    # Slurm status
92    S_PENDING = JobStatus.from_string("PENDING")
93    S_RUNNING = JobStatus.from_string("RUNNING")
94    S_RESIZING = JobStatus.from_string("RESIZING")
95    S_SUSPENDED = JobStatus.from_string("SUSPENDED")
96    S_COMPLETED = JobStatus.from_string("COMPLETED")
97    S_CANCELLED = JobStatus.from_string("CANCELLED")
98    S_FAILED = JobStatus.from_string("FAILED")
99    S_TIMEOUT = JobStatus.from_string("TIMEOUT")
100    S_PREEMPTED = JobStatus.from_string("PREEMPTED")
101    S_NODEFAIL = JobStatus.from_string("NODEFAIL")
102
103    @staticmethod
104    def from_qtype_and_id(qtype, queue_id, qname=None):
105        """
106        Return a new istance of the appropriate subclass.
107
108        Args:
109            qtype: String specifying the Resource manager type.
110            queue_id: Job identifier.
111            qname: Name of the queue (optional).
112        """
113        for cls in all_subclasses(QueueJob):
114            if cls.QTYPE == qtype: break
115        else:
116            logger.critical("Cannot find QueueJob subclass registered for qtype %s" % qtype)
117            cls = QueueJob
118
119        return cls(queue_id, qname=qname)
120
121    def __init__(self, queue_id, qname="UnknownQueue"):
122        """
123        Args:
124            queue_id: Job identifier.
125            qname: Name of the queue (optional).
126        """
127        self.qid, self.qname = queue_id, qname
128
129        # Initialize properties.
130        self.status, self.exitcode, self.signal = None, None, None
131
132    def __repr__(self):
133        return "<%s, qid=%s, status=%s, exit_code=%s>" % (
134            self.__class__.__name__, self.qid, self.status, self.exitcode)
135
136    def __bool__(self):
137        return self.qid is not None
138
139    __nonzero__ = __bool__
140
141    #In many cases, we only need to know if job is terminated or not
142    #def is_terminated()
143
144    @property
145    def is_completed(self):
146        return self.status == self.S_COMPLETED
147
148    @property
149    def is_running(self):
150        return self.status == self.S_RUNNING
151
152    @property
153    def is_failed(self):
154        return self.status == self.S_FAILED
155
156    @property
157    def timeout(self):
158        return self.status == self.S_TIMEOUT
159
160    @property
161    def has_node_failures(self):
162        return self.status == self.S_NODEFAIL
163
164    @property
165    def unknown_status(self):
166        return self.status == self.S_UNKNOWN
167
168    def set_status_exitcode_signal(self, status, exitcode, signal):
169        self.status, self.exitcode, self.signal = status, exitcode, signal
170
171    def likely_code_error(self):
172        """
173        See <http://man7.org/linux/man-pages/man7/signal.7.html>
174
175        ========= =========  =======  ========================================================================
176        SIGHUP        1       Term    Hangup detected on controlling terminal or death of controlling process
177        SIGINT        2       Term    Interrupt from keyboard
178        SIGQUIT       3       Core    Quit from keyboard
179        SIGILL        4       Core    Illegal Instruction
180        SIGABRT       6       Core    Abort signal from abort(3)
181        SIGFPE        8       Core    Floating point exception
182        SIGKILL       9       Term    Kill signal
183        SIGSEGV      11       Core    Invalid memory reference
184        SIGPIPE      13       Term    Broken pipe: write to pipe with no readers
185        SIGALRM      14       Term    Timer signal from alarm(2)
186        SIGTERM      15       Term    Termination signal
187        SIGUSR1   30,10,16    Term    User-defined signal 1
188        SIGUSR2   31,12,17    Term    User-defined signal 2
189        SIGCHLD   20,17,18    Ign     Child stopped or terminated
190        SIGCONT   19,18,25    Cont    Continue if stopped
191        SIGSTOP   17,19,23    Stop    Stop process
192        SIGTSTP   18,20,24    Stop    Stop typed at terminal
193        SIGTTIN   21,21,26    Stop    Terminal input for background process
194        SIGTTOU   22,22,27    Stop    Terminal output for background process
195        ========= =========  =======  ========================================================================
196
197        The signals SIGKILL and SIGSTOP cannot be caught, blocked, or ignored.
198        Next the signals not in the POSIX.1-1990 standard but described in SUSv2 and POSIX.1-2001.
199
200        ==========  ========= ========  ========================================
201        Signal       Value     Action   Comment
202        SIGBUS      10,7,10     Core    Bus error (bad memory access)
203        SIGPOLL                 Term    Pollable event (Sys V).
204                                        Synonym for SIGIO
205        SIGPROF     27,27,29    Term    Profiling timer expired
206        SIGSYS      12,31,12    Core    Bad argument to routine (SVr4)
207        SIGTRAP        5        Core    Trace/breakpoint trap
208        SIGURG      16,23,21    Ign     Urgent condition on socket (4.2BSD)
209        SIGVTALRM   26,26,28    Term    Virtual alarm clock (4.2BSD)
210        SIGXCPU     24,24,30    Core    CPU time limit exceeded (4.2BSD)
211        SIGXFSZ     25,25,31    Core    File size limit exceeded (4.2BSD)
212        ==========  ========= ========  ========================================
213        """
214        for sig_name in ("SIGFPE",):
215            if self.received_signal(sig_name): return sig_name
216
217        return False
218
219    def received_signal(self, sig_name):
220        if self.signal is None: return False
221        # Get the numeric value from signal and compare it with self.signal
222        import signal
223        try:
224            return self.signal == getattr(signal, sig_name)
225        except AttributeError:
226            # invalid sig_name or sig_name not available on this OS.
227            return False
228
229    def estimated_start_time(self):
230        """Return date with estimated start time. None if it cannot be detected"""
231        return None
232
233    def get_info(self, **kwargs):
234        return None
235
236    def get_nodes(self, **kwargs):
237        return None
238
239    def get_stats(self, **kwargs):
240        return None
241
242
243class ShellJob(QueueJob):
244    """Handler for Shell jobs."""
245    QTYPE = "shell"
246
247
248class SlurmJob(QueueJob):
249    """Handler for Slurm jobs."""
250    QTYPE = "slurm"
251
252    def estimated_start_time(self):
253        #squeue  --start -j  116791
254        #  JOBID PARTITION     NAME     USER  ST           START_TIME  NODES NODELIST(REASON)
255        # 116791      defq gs6q2wop username  PD  2014-11-04T09:27:15     16 (QOSResourceLimit)
256        cmd = "squeue" "--start", "--job %d" % self.qid
257        process = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
258        out, err = process.communicate()
259
260        if process.returncode != 0:
261            logger.critical(err)
262            return None
263
264        lines = out.splitlines()
265        if len(lines) <= 2: return None
266
267        from datetime import datetime
268        for line in lines:
269            tokens = line.split()
270            if int(tokens[0]) == self.qid:
271                date_string = tokens[5]
272                if date_string == "N/A": return None
273                return datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%S")
274
275        return None
276
277    def get_info(self, **kwargs):
278        # See https://computing.llnl.gov/linux/slurm/sacct.html
279        #If SLURM job ids are reset, some job numbers will
280        #probably appear more than once refering to different jobs.
281        #Without this option only the most recent jobs will be displayed.
282
283        #state Displays the job status, or state.
284        #Output can be RUNNING, RESIZING, SUSPENDED, COMPLETED, CANCELLED, FAILED, TIMEOUT,
285        #PREEMPTED or NODE_FAIL. If more information is available on the job state than will fit
286        #into the current field width (for example, the uid that CANCELLED a job) the state will be followed by a "+".
287
288        #gmatteo@master2:~
289        #sacct --job 112367 --format=jobid,exitcode,state --allocations --parsable2
290        #JobID|ExitCode|State
291        #112367|0:0|RUNNING
292        #scontrol show job 800197 --oneliner
293
294        # For more info
295        #login1$ scontrol show job 1676354
296
297        #cmd = "sacct --job %i --format=jobid,exitcode,state --allocations --parsable2" % self.qid
298        cmd = "scontrol show job %i --oneliner" % self.qid
299        process = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
300        out, err = process.communicate()
301
302        if process.returncode != 0:
303            logger.critical(err)
304            return None
305
306        tokens = out.splitlines()
307        info = AttrDict()
308        for line in tokens:
309            #print(line)
310            k, v = line.split("=")
311            info[k] = v
312            #print(info)
313
314        qid = int(info.JobId)
315        assert qid == self.qid
316        exitcode = info.ExitCode
317        status = info.JobState
318
319        if ":" in exitcode:
320            exitcode, signal = map(int, exitcode.split(":"))
321        else:
322            exitcode, signal = int(exitcode), None
323
324        i = status.find("+")
325        if i != -1: status = status[:i]
326
327        self.set_status_exitcode_signal(JobStatus.from_string(status), exitcode, signal)
328        return AttrDict(exitcode=exitcode, signal=signal, status=status)
329
330    def get_stats(self, **kwargs):
331        cmd = "sacct --long --job %s --parsable2" % self.qid
332        process = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
333        out, err = process.communicate()
334
335        if process.returncode != 0:
336            logger.critical(err)
337            return {}
338
339        lines = out.splitlines()
340        keys = lines[0].strip().split("|")
341        values = lines[1].strip().split("|")
342        #print("lines0", lines[0])
343        return dict(zip(keys, values))
344
345
346class PbsProJob(QueueJob):
347    """
348    Handler for PbsPro Jobs.
349
350    See also https://github.com/plediii/pbs_util for a similar project.
351    """
352    QTYPE = "pbspro"
353    # Mapping PrbPro --> Slurm. From `man qstat`
354    #
355    # S  The job’s state:
356    #      B  Array job has at least one subjob running.
357    #      E  Job is exiting after having run.
358    #      F  Job is finished.
359    #      H  Job is held.
360    #      M  Job was moved to another server.
361    #      Q  Job is queued.
362    #      R  Job is running.
363    #      S  Job is suspended.
364    #      T  Job is being moved to new location.
365    #      U  Cycle-harvesting job is suspended due to keyboard activity.
366    #      W  Job is waiting for its submitter-assigned start time to be reached.
367    #      X  Subjob has completed execution or has been deleted.
368
369    PBSSTAT_TO_SLURM = defaultdict(lambda x: QueueJob.S_UNKNOWN, [
370        ("E", QueueJob.S_FAILED),
371        ("F", QueueJob.S_COMPLETED),
372        ("Q", QueueJob.S_PENDING),
373        ("R", QueueJob.S_RUNNING),
374        ("S", QueueJob.S_SUSPENDED),
375    ])
376
377    def estimated_start_time(self):
378        # qstat -T - Shows the estimated start time for all jobs in the queue.
379        #                                                                           Est
380        #                                                            Req'd  Req'd   Start
381        #Job ID          Username Queue    Jobname    SessID NDS TSK Memory Time  S Time
382        #--------------- -------- -------- ---------- ------ --- --- ------ ----- - -----
383        #5669001.frontal username large    gs.Pt         --   96  96    --  03:00 Q    --
384        cmd = "qstat %s -T" % self.qid
385        process = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
386        out, err = process.communicate()
387
388        if process.returncode != 0:
389            logger.critical(err)
390            return None
391
392        line = out.splitlines()[-1]
393        sdate = line.split()[-1]
394        if sdate in ("--", "?"):
395            return None
396
397        # TODO One should convert to datetime
398        return sdate
399
400    def get_info(self, **kwargs):
401
402        # See also qstat -f
403        #http://sc.tamu.edu/help/origins/batch.shtml#qstat
404
405        #$> qstat 5666289
406        #frontal1:
407        #                                                            Req'd  Req'd   Elap
408        #Job ID          Username Queue    Jobname    SessID NDS TSK Memory Time  S Time
409        #--------------- -------- -------- ---------- ------ --- --- ------ ----- - -----
410        #5666289.frontal username main_ivy MorfeoTChk  57546   1   4    --  08:00 R 00:17
411
412        cmd = "qstat %d" % self.qid
413        process = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
414        out, err = process.communicate()
415
416        if process.returncode != 0:
417            # qstat: 5904257.frontal1 Job has finished, use -x or -H to obtain historical job information\n
418            cmd = "qstat %d -x" % self.qid
419            process = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
420            out, err = process.communicate()
421
422            if process.returncode != 0:
423                logger.critical(out)
424                logger.critical(err)
425                return None
426
427        # Here I don't know what's happeing but I get an output that differs from the one obtained in the terminal.
428        # Job id            Name             User              Time Use S Queue
429        # ----------------  ---------------- ----------------  -------- - -----
430        # 5905011.frontal1  t0               gmatteo           01:37:08 F main_wes
431        #print(out)
432
433        line = out.splitlines()[-1]
434        #print(line.split())
435        status = self.PBSSTAT_TO_SLURM[line.split()[4]]
436
437        # Exit code and signal are not available.
438        # Once could use tracejob....
439        # See also http://docs.adaptivecomputing.com/torque/3-0-5/a.gprologueepilogue.php
440        self.set_status_exitcode_signal(status, None, None)
441
442
443#################################
444# Unsupported resource managers #
445#################################
446
447class TorqueJob(QueueJob):
448    """Not supported"""
449    QTYPE = "torque"
450
451
452class SgeJob(QueueJob):
453    """Not supported"""
454    QTYPE = "sge"
455
456
457class MoabJob(QueueJob):
458    """Not supported"""
459    QTYPE = "moab"
460
461
462class BlueGeneJob(QueueJob):
463    """Not supported"""
464    QTYPE = "bluegene"
465