1# coding: utf-8 2""" 3Objects and methods to contact the resource manager to get info on the status of the job and useful statistics. 4Note that this is not a wrapper for the C API but a collection of simple wrappers around the shell commands 5provided by the resource manager (qsub, qdel and qstat for PBS, sinfo, squeue... for Slurm). 6The main goal indeed is providing a simplified common interface for different resource managers without 7having to rely on external libraries. 8""" 9 10import shlex 11 12from collections import OrderedDict, defaultdict 13from subprocess import Popen, PIPE 14from monty.collections import AttrDict 15from monty.inspect import all_subclasses 16 17import logging 18logger = logging.getLogger(__name__) 19 20 21class JobStatus(int): 22 """ 23 This object is an integer representing the status of a :class:`QueueJob`. 24 25 Slurm API, see `man squeue`. 26 27 JOB STATE CODES 28 Jobs typically pass through several states in the course of their execution. The typical states are 29 PENDING, RUNNING, SUSPENDED, COMPLETING, and COMPLETED. An explanation of each state follows:: 30 31 BF BOOT_FAIL Job terminated due to launch failure, typically due to a hardware failure (e.g. 32 unable to boot the node or block and the job can not be requeued). 33 CA CANCELLED Job was explicitly cancelled by the user or system administrator. 34 The job may or may not have been initiated. 35 CD COMPLETED Job has terminated all processes on all nodes. 36 CF CONFIGURING Job has been allocated resources, but are waiting for them to become ready for use (e.g. booting). 37 CG COMPLETING Job is in the process of completing. Some processes on some nodes may still be active. 38 F FAILED Job terminated with non-zero exit code or other failure condition. 39 NF NODE_FAIL Job terminated due to failure of one or more allocated nodes. 40 PD PENDING Job is awaiting resource allocation. 41 PR PREEMPTED Job terminated due to preemption. 42 R RUNNING Job currently has an allocation. 43 S SUSPENDED Job has an allocation, but execution has been suspended. 44 TO TIMEOUT Job terminated upon reaching its time limit. 45 SE SPECIAL_EXIT The job was requeued in a special state. This state can be set by users, typically 46 in EpilogSlurmctld, if the job has terminated with a particular exit value. 47 48 """ 49 50 _STATUS_TABLE = OrderedDict([ 51 (-1, "UNKNOWN"), 52 (0, "PENDING"), 53 (1, "RUNNING"), 54 (2, "RESIZING"), 55 (3, "SUSPENDED"), 56 (4, "COMPLETED"), 57 (5, "CANCELLED"), 58 (6, "FAILED"), 59 (7, "TIMEOUT"), 60 (8, "PREEMPTED"), 61 (9, "NODEFAIL"), 62 ]) 63 64 def __repr__(self): 65 return "<%s: %s, at %s>" % (self.__class__.__name__, str(self), id(self)) 66 67 def __str__(self): 68 """String representation.""" 69 return self._STATUS_TABLE[self] 70 71 @classmethod 72 def from_string(cls, s): 73 """Return a :class:`JobStatus` instance from its string representation.""" 74 for num, text in cls._STATUS_TABLE.items(): 75 if text == s: return cls(num) 76 else: 77 #raise ValueError("Wrong string %s" % s) 78 logger.warning("Got unknown status: %s" % s) 79 return cls.from_string("UNKNOWN") 80 81 82class QueueJob(object): 83 """ 84 This object provides methods to contact the resource manager to get info on the status 85 of the job and useful statistics. This is an abstract class. 86 """ 87 QTYPE = None 88 89 # Used to handle other resource managers. 90 S_UNKNOWN = JobStatus.from_string("UNKNOWN") 91 # Slurm status 92 S_PENDING = JobStatus.from_string("PENDING") 93 S_RUNNING = JobStatus.from_string("RUNNING") 94 S_RESIZING = JobStatus.from_string("RESIZING") 95 S_SUSPENDED = JobStatus.from_string("SUSPENDED") 96 S_COMPLETED = JobStatus.from_string("COMPLETED") 97 S_CANCELLED = JobStatus.from_string("CANCELLED") 98 S_FAILED = JobStatus.from_string("FAILED") 99 S_TIMEOUT = JobStatus.from_string("TIMEOUT") 100 S_PREEMPTED = JobStatus.from_string("PREEMPTED") 101 S_NODEFAIL = JobStatus.from_string("NODEFAIL") 102 103 @staticmethod 104 def from_qtype_and_id(qtype, queue_id, qname=None): 105 """ 106 Return a new istance of the appropriate subclass. 107 108 Args: 109 qtype: String specifying the Resource manager type. 110 queue_id: Job identifier. 111 qname: Name of the queue (optional). 112 """ 113 for cls in all_subclasses(QueueJob): 114 if cls.QTYPE == qtype: break 115 else: 116 logger.critical("Cannot find QueueJob subclass registered for qtype %s" % qtype) 117 cls = QueueJob 118 119 return cls(queue_id, qname=qname) 120 121 def __init__(self, queue_id, qname="UnknownQueue"): 122 """ 123 Args: 124 queue_id: Job identifier. 125 qname: Name of the queue (optional). 126 """ 127 self.qid, self.qname = queue_id, qname 128 129 # Initialize properties. 130 self.status, self.exitcode, self.signal = None, None, None 131 132 def __repr__(self): 133 return "<%s, qid=%s, status=%s, exit_code=%s>" % ( 134 self.__class__.__name__, self.qid, self.status, self.exitcode) 135 136 def __bool__(self): 137 return self.qid is not None 138 139 __nonzero__ = __bool__ 140 141 #In many cases, we only need to know if job is terminated or not 142 #def is_terminated() 143 144 @property 145 def is_completed(self): 146 return self.status == self.S_COMPLETED 147 148 @property 149 def is_running(self): 150 return self.status == self.S_RUNNING 151 152 @property 153 def is_failed(self): 154 return self.status == self.S_FAILED 155 156 @property 157 def timeout(self): 158 return self.status == self.S_TIMEOUT 159 160 @property 161 def has_node_failures(self): 162 return self.status == self.S_NODEFAIL 163 164 @property 165 def unknown_status(self): 166 return self.status == self.S_UNKNOWN 167 168 def set_status_exitcode_signal(self, status, exitcode, signal): 169 self.status, self.exitcode, self.signal = status, exitcode, signal 170 171 def likely_code_error(self): 172 """ 173 See <http://man7.org/linux/man-pages/man7/signal.7.html> 174 175 ========= ========= ======= ======================================================================== 176 SIGHUP 1 Term Hangup detected on controlling terminal or death of controlling process 177 SIGINT 2 Term Interrupt from keyboard 178 SIGQUIT 3 Core Quit from keyboard 179 SIGILL 4 Core Illegal Instruction 180 SIGABRT 6 Core Abort signal from abort(3) 181 SIGFPE 8 Core Floating point exception 182 SIGKILL 9 Term Kill signal 183 SIGSEGV 11 Core Invalid memory reference 184 SIGPIPE 13 Term Broken pipe: write to pipe with no readers 185 SIGALRM 14 Term Timer signal from alarm(2) 186 SIGTERM 15 Term Termination signal 187 SIGUSR1 30,10,16 Term User-defined signal 1 188 SIGUSR2 31,12,17 Term User-defined signal 2 189 SIGCHLD 20,17,18 Ign Child stopped or terminated 190 SIGCONT 19,18,25 Cont Continue if stopped 191 SIGSTOP 17,19,23 Stop Stop process 192 SIGTSTP 18,20,24 Stop Stop typed at terminal 193 SIGTTIN 21,21,26 Stop Terminal input for background process 194 SIGTTOU 22,22,27 Stop Terminal output for background process 195 ========= ========= ======= ======================================================================== 196 197 The signals SIGKILL and SIGSTOP cannot be caught, blocked, or ignored. 198 Next the signals not in the POSIX.1-1990 standard but described in SUSv2 and POSIX.1-2001. 199 200 ========== ========= ======== ======================================== 201 Signal Value Action Comment 202 SIGBUS 10,7,10 Core Bus error (bad memory access) 203 SIGPOLL Term Pollable event (Sys V). 204 Synonym for SIGIO 205 SIGPROF 27,27,29 Term Profiling timer expired 206 SIGSYS 12,31,12 Core Bad argument to routine (SVr4) 207 SIGTRAP 5 Core Trace/breakpoint trap 208 SIGURG 16,23,21 Ign Urgent condition on socket (4.2BSD) 209 SIGVTALRM 26,26,28 Term Virtual alarm clock (4.2BSD) 210 SIGXCPU 24,24,30 Core CPU time limit exceeded (4.2BSD) 211 SIGXFSZ 25,25,31 Core File size limit exceeded (4.2BSD) 212 ========== ========= ======== ======================================== 213 """ 214 for sig_name in ("SIGFPE",): 215 if self.received_signal(sig_name): return sig_name 216 217 return False 218 219 def received_signal(self, sig_name): 220 if self.signal is None: return False 221 # Get the numeric value from signal and compare it with self.signal 222 import signal 223 try: 224 return self.signal == getattr(signal, sig_name) 225 except AttributeError: 226 # invalid sig_name or sig_name not available on this OS. 227 return False 228 229 def estimated_start_time(self): 230 """Return date with estimated start time. None if it cannot be detected""" 231 return None 232 233 def get_info(self, **kwargs): 234 return None 235 236 def get_nodes(self, **kwargs): 237 return None 238 239 def get_stats(self, **kwargs): 240 return None 241 242 243class ShellJob(QueueJob): 244 """Handler for Shell jobs.""" 245 QTYPE = "shell" 246 247 248class SlurmJob(QueueJob): 249 """Handler for Slurm jobs.""" 250 QTYPE = "slurm" 251 252 def estimated_start_time(self): 253 #squeue --start -j 116791 254 # JOBID PARTITION NAME USER ST START_TIME NODES NODELIST(REASON) 255 # 116791 defq gs6q2wop username PD 2014-11-04T09:27:15 16 (QOSResourceLimit) 256 cmd = "squeue" "--start", "--job %d" % self.qid 257 process = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE) 258 out, err = process.communicate() 259 260 if process.returncode != 0: 261 logger.critical(err) 262 return None 263 264 lines = out.splitlines() 265 if len(lines) <= 2: return None 266 267 from datetime import datetime 268 for line in lines: 269 tokens = line.split() 270 if int(tokens[0]) == self.qid: 271 date_string = tokens[5] 272 if date_string == "N/A": return None 273 return datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%S") 274 275 return None 276 277 def get_info(self, **kwargs): 278 # See https://computing.llnl.gov/linux/slurm/sacct.html 279 #If SLURM job ids are reset, some job numbers will 280 #probably appear more than once refering to different jobs. 281 #Without this option only the most recent jobs will be displayed. 282 283 #state Displays the job status, or state. 284 #Output can be RUNNING, RESIZING, SUSPENDED, COMPLETED, CANCELLED, FAILED, TIMEOUT, 285 #PREEMPTED or NODE_FAIL. If more information is available on the job state than will fit 286 #into the current field width (for example, the uid that CANCELLED a job) the state will be followed by a "+". 287 288 #gmatteo@master2:~ 289 #sacct --job 112367 --format=jobid,exitcode,state --allocations --parsable2 290 #JobID|ExitCode|State 291 #112367|0:0|RUNNING 292 #scontrol show job 800197 --oneliner 293 294 # For more info 295 #login1$ scontrol show job 1676354 296 297 #cmd = "sacct --job %i --format=jobid,exitcode,state --allocations --parsable2" % self.qid 298 cmd = "scontrol show job %i --oneliner" % self.qid 299 process = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE) 300 out, err = process.communicate() 301 302 if process.returncode != 0: 303 logger.critical(err) 304 return None 305 306 tokens = out.splitlines() 307 info = AttrDict() 308 for line in tokens: 309 #print(line) 310 k, v = line.split("=") 311 info[k] = v 312 #print(info) 313 314 qid = int(info.JobId) 315 assert qid == self.qid 316 exitcode = info.ExitCode 317 status = info.JobState 318 319 if ":" in exitcode: 320 exitcode, signal = map(int, exitcode.split(":")) 321 else: 322 exitcode, signal = int(exitcode), None 323 324 i = status.find("+") 325 if i != -1: status = status[:i] 326 327 self.set_status_exitcode_signal(JobStatus.from_string(status), exitcode, signal) 328 return AttrDict(exitcode=exitcode, signal=signal, status=status) 329 330 def get_stats(self, **kwargs): 331 cmd = "sacct --long --job %s --parsable2" % self.qid 332 process = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE) 333 out, err = process.communicate() 334 335 if process.returncode != 0: 336 logger.critical(err) 337 return {} 338 339 lines = out.splitlines() 340 keys = lines[0].strip().split("|") 341 values = lines[1].strip().split("|") 342 #print("lines0", lines[0]) 343 return dict(zip(keys, values)) 344 345 346class PbsProJob(QueueJob): 347 """ 348 Handler for PbsPro Jobs. 349 350 See also https://github.com/plediii/pbs_util for a similar project. 351 """ 352 QTYPE = "pbspro" 353 # Mapping PrbPro --> Slurm. From `man qstat` 354 # 355 # S The job’s state: 356 # B Array job has at least one subjob running. 357 # E Job is exiting after having run. 358 # F Job is finished. 359 # H Job is held. 360 # M Job was moved to another server. 361 # Q Job is queued. 362 # R Job is running. 363 # S Job is suspended. 364 # T Job is being moved to new location. 365 # U Cycle-harvesting job is suspended due to keyboard activity. 366 # W Job is waiting for its submitter-assigned start time to be reached. 367 # X Subjob has completed execution or has been deleted. 368 369 PBSSTAT_TO_SLURM = defaultdict(lambda x: QueueJob.S_UNKNOWN, [ 370 ("E", QueueJob.S_FAILED), 371 ("F", QueueJob.S_COMPLETED), 372 ("Q", QueueJob.S_PENDING), 373 ("R", QueueJob.S_RUNNING), 374 ("S", QueueJob.S_SUSPENDED), 375 ]) 376 377 def estimated_start_time(self): 378 # qstat -T - Shows the estimated start time for all jobs in the queue. 379 # Est 380 # Req'd Req'd Start 381 #Job ID Username Queue Jobname SessID NDS TSK Memory Time S Time 382 #--------------- -------- -------- ---------- ------ --- --- ------ ----- - ----- 383 #5669001.frontal username large gs.Pt -- 96 96 -- 03:00 Q -- 384 cmd = "qstat %s -T" % self.qid 385 process = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE) 386 out, err = process.communicate() 387 388 if process.returncode != 0: 389 logger.critical(err) 390 return None 391 392 line = out.splitlines()[-1] 393 sdate = line.split()[-1] 394 if sdate in ("--", "?"): 395 return None 396 397 # TODO One should convert to datetime 398 return sdate 399 400 def get_info(self, **kwargs): 401 402 # See also qstat -f 403 #http://sc.tamu.edu/help/origins/batch.shtml#qstat 404 405 #$> qstat 5666289 406 #frontal1: 407 # Req'd Req'd Elap 408 #Job ID Username Queue Jobname SessID NDS TSK Memory Time S Time 409 #--------------- -------- -------- ---------- ------ --- --- ------ ----- - ----- 410 #5666289.frontal username main_ivy MorfeoTChk 57546 1 4 -- 08:00 R 00:17 411 412 cmd = "qstat %d" % self.qid 413 process = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE) 414 out, err = process.communicate() 415 416 if process.returncode != 0: 417 # qstat: 5904257.frontal1 Job has finished, use -x or -H to obtain historical job information\n 418 cmd = "qstat %d -x" % self.qid 419 process = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE) 420 out, err = process.communicate() 421 422 if process.returncode != 0: 423 logger.critical(out) 424 logger.critical(err) 425 return None 426 427 # Here I don't know what's happeing but I get an output that differs from the one obtained in the terminal. 428 # Job id Name User Time Use S Queue 429 # ---------------- ---------------- ---------------- -------- - ----- 430 # 5905011.frontal1 t0 gmatteo 01:37:08 F main_wes 431 #print(out) 432 433 line = out.splitlines()[-1] 434 #print(line.split()) 435 status = self.PBSSTAT_TO_SLURM[line.split()[4]] 436 437 # Exit code and signal are not available. 438 # Once could use tracejob.... 439 # See also http://docs.adaptivecomputing.com/torque/3-0-5/a.gprologueepilogue.php 440 self.set_status_exitcode_signal(status, None, None) 441 442 443################################# 444# Unsupported resource managers # 445################################# 446 447class TorqueJob(QueueJob): 448 """Not supported""" 449 QTYPE = "torque" 450 451 452class SgeJob(QueueJob): 453 """Not supported""" 454 QTYPE = "sge" 455 456 457class MoabJob(QueueJob): 458 """Not supported""" 459 QTYPE = "moab" 460 461 462class BlueGeneJob(QueueJob): 463 """Not supported""" 464 QTYPE = "bluegene" 465