1# coding: utf-8 2# Copyright (c) Pymatgen Development Team. 3# Distributed under the terms of the MIT License. 4"""Tools for the submission of Tasks.""" 5 6import os 7import time 8import ruamel.yaml as yaml 9import pickle 10 11from collections import deque 12from datetime import timedelta 13from io import StringIO 14from monty.io import get_open_fds 15from monty.string import boxed, is_string 16from monty.os.path import which 17from monty.collections import AttrDict, dict2namedtuple 18from monty.termcolor import cprint 19from .utils import as_bool, File, Directory 20from . import qutils as qu 21from pymatgen.util.io_utils import ask_yesno 22 23try: 24 import apscheduler 25 has_apscheduler = True 26 has_sched_v3 = apscheduler.version >= "3.0.0" 27except ImportError: 28 has_apscheduler = False 29 30import logging 31logger = logging.getLogger(__name__) 32 33__all__ = [ 34 "ScriptEditor", 35 "PyLauncher", 36 "PyFlowScheduler", 37] 38 39 40def straceback(): 41 """Returns a string with the traceback.""" 42 import traceback 43 return traceback.format_exc() 44 45 46class ScriptEditor(object): 47 """Simple editor that simplifies the writing of shell scripts""" 48 _shell = '/bin/bash' 49 50 def __init__(self): 51 self._lines = [] 52 53 @property 54 def shell(self): 55 return self._shell 56 57 def _add(self, text, pre=""): 58 if is_string(text): 59 self._lines.append(pre + text) 60 else: 61 self._lines.extend([pre + t for t in text]) 62 63 def reset(self): 64 """Reset the editor.""" 65 try: 66 del self._lines 67 except AttributeError: 68 pass 69 70 def shebang(self): 71 """Adds the shebang line.""" 72 self._lines.append('#!' + self.shell) 73 74 def declare_var(self, key, val): 75 """Declare a env variable. If val is None the variable is unset.""" 76 if val is not None: 77 line = "export " + key + '=' + str(val) 78 else: 79 line = "unset " + key 80 81 self._add(line) 82 83 def declare_vars(self, d): 84 """Declare the variables defined in the dictionary d.""" 85 for k, v in d.items(): 86 self.declare_var(k, v) 87 88 def export_envar(self, key, val): 89 """Export an environment variable.""" 90 line = "export " + key + "=" + str(val) 91 self._add(line) 92 93 def export_envars(self, env): 94 """Export the environment variables contained in the dict env.""" 95 for k, v in env.items(): 96 self.export_envar(k, v) 97 98 def add_emptyline(self): 99 """Add an empty line.""" 100 self._add("", pre="") 101 102 def add_comment(self, comment): 103 """Add a comment""" 104 self._add(comment, pre="# ") 105 106 def load_modules(self, modules): 107 """Load the list of specified modules.""" 108 for module in modules: 109 self.load_module(module) 110 111 def load_module(self, module): 112 self._add('module load ' + module + " 2>> mods.err") 113 114 def add_line(self, line): 115 self._add(line) 116 117 def add_lines(self, lines): 118 self._add(lines) 119 120 def get_script_str(self, reset=True): 121 """Returns a string with the script and reset the editor if reset is True""" 122 s = "\n".join(l for l in self._lines) 123 if reset: 124 self.reset() 125 return s 126 127 128class PyLauncherError(Exception): 129 """Error class for PyLauncher.""" 130 131 132class PyLauncher(object): 133 """This object handle the submission of the tasks contained in a |Flow|.""" 134 Error = PyLauncherError 135 136 def __init__(self, flow, **kwargs): 137 """ 138 Initialize the object 139 140 Args: 141 flow: |Flow| object 142 max_njobs_inqueue: The launcher will stop submitting jobs when the 143 number of jobs in the queue is >= Max number of jobs 144 """ 145 self.flow = flow 146 self.max_njobs_inqueue = kwargs.get("max_njobs_inqueue", 200) 147 148 #self.flow.check_pid_file() 149 150 def single_shot(self): 151 """ 152 Run the first :class:`Task` than is ready for execution. 153 154 Returns: 155 Number of jobs launched. 156 """ 157 num_launched = 0 158 159 # Get the tasks that can be executed in each workflow. 160 tasks = [] 161 for work in self.flow: 162 try: 163 task = work.fetch_task_to_run() 164 165 if task is not None: 166 tasks.append(task) 167 else: 168 # No task found, this usually happens when we have dependencies. 169 # Beware of possible deadlocks here! 170 logger.debug("No task to run! Possible deadlock") 171 172 except StopIteration: 173 logger.info("All tasks completed.") 174 175 # Submit the tasks and update the database. 176 if tasks: 177 tasks[0].start() 178 num_launched += 1 179 180 self.flow.pickle_dump() 181 182 return num_launched 183 184 def rapidfire(self, max_nlaunch=-1, max_loops=1, sleep_time=5): 185 """ 186 Keeps submitting `Tasks` until we are out of jobs or no job is ready to run. 187 188 Args: 189 max_nlaunch: Maximum number of launches. default: no limit. 190 max_loops: Maximum number of loops 191 sleep_time: seconds to sleep between rapidfire loop iterations 192 193 Returns: 194 The number of tasks launched. 195 """ 196 num_launched, do_exit, launched = 0, False, [] 197 198 for count in range(max_loops): 199 if do_exit: 200 break 201 if count > 0: 202 time.sleep(sleep_time) 203 204 tasks = self.fetch_tasks_to_run() 205 206 # I don't know why but we receive duplicated tasks. 207 if any(task in launched for task in tasks): 208 logger.critical("numtasks %d already in launched list:\n%s" % (len(tasks), launched)) 209 210 # Preventive test. 211 tasks = [t for t in tasks if t not in launched] 212 213 if not tasks: 214 continue 215 216 for task in tasks: 217 fired = task.start() 218 if fired: 219 launched.append(task) 220 num_launched += 1 221 222 if num_launched >= max_nlaunch > 0: 223 logger.info('num_launched >= max_nlaunch, going back to sleep') 224 do_exit = True 225 break 226 227 # Update the database. 228 self.flow.pickle_dump() 229 230 return num_launched 231 232 def fetch_tasks_to_run(self): 233 """ 234 Return the list of tasks that can be submitted. 235 Empty list if no task has been found. 236 """ 237 tasks_to_run = [] 238 239 for work in self.flow: 240 tasks_to_run.extend(work.fetch_alltasks_to_run()) 241 242 return tasks_to_run 243 244 245class PyFlowSchedulerError(Exception): 246 """Exceptions raised by `PyFlowScheduler`.""" 247 248 249class PyFlowScheduler(object): 250 """ 251 This object schedules the submission of the tasks in a |Flow|. 252 There are two types of errors that might occur during the execution of the jobs: 253 254 #. Python exceptions 255 #. Errors in the ab-initio code 256 257 Python exceptions are easy to detect and are usually due to a bug in the python code or random errors such as IOError. 258 The set of errors in the ab-initio is much much broader. It includes wrong input data, segmentation 259 faults, problems with the resource manager, etc. The flow tries to handle the most common cases 260 but there's still a lot of room for improvement. 261 Note, in particular, that `PyFlowScheduler` will shutdown automatically in the following cases: 262 263 #. The number of python exceptions is > max_num_pyexcs 264 265 #. The number of task errors (i.e. the number of tasks whose status is S_ERROR) is > max_num_abierrs 266 267 #. The number of jobs launched becomes greater than (`safety_ratio` * total_number_of_tasks). 268 269 #. The scheduler will send an email to the user (specified by `mailto`) every `remindme_s` seconds. 270 If the mail cannot be sent, the scheduler will shutdown automatically. 271 This check prevents the scheduler from being trapped in an infinite loop. 272 """ 273 # Configuration file. 274 YAML_FILE = "scheduler.yml" 275 USER_CONFIG_DIR = os.path.join(os.path.expanduser("~"), ".abinit", "abipy") 276 277 Error = PyFlowSchedulerError 278 279 @classmethod 280 def autodoc(cls): 281 i = cls.__init__.__doc__.index("Args:") 282 return cls.__init__.__doc__[i+5:] 283 284 def __init__(self, **kwargs): 285 """ 286 Args: 287 weeks: number of weeks to wait (DEFAULT: 0). 288 days: number of days to wait (DEFAULT: 0). 289 hours: number of hours to wait (DEFAULT: 0). 290 minutes: number of minutes to wait (DEFAULT: 0). 291 seconds: number of seconds to wait (DEFAULT: 0). 292 mailto: The scheduler will send an email to `mailto` every `remindme_s` seconds. 293 (DEFAULT: None i.e. not used). 294 verbose: (int) verbosity level. (DEFAULT: 0) 295 use_dynamic_manager: "yes" if the |TaskManager| must be re-initialized from 296 file before launching the jobs. (DEFAULT: "no") 297 max_njobs_inqueue: Limit on the number of jobs that can be present in the queue. (DEFAULT: 200) 298 max_ncores_used: Maximum number of cores that can be used by the scheduler. 299 remindme_s: The scheduler will send an email to the user specified 300 by `mailto` every `remindme_s` seconds. (int, DEFAULT: 1 day). 301 max_num_pyexcs: The scheduler will exit if the number of python exceptions is > max_num_pyexcs 302 (int, DEFAULT: 0) 303 max_num_abierrs: The scheduler will exit if the number of errored tasks is > max_num_abierrs 304 (int, DEFAULT: 0) 305 safety_ratio: The scheduler will exits if the number of jobs launched becomes greater than 306 `safety_ratio` * total_number_of_tasks_in_flow. (int, DEFAULT: 5) 307 max_nlaunches: Maximum number of tasks launched in a single iteration of the scheduler. 308 (DEFAULT: -1 i.e. no limit) 309 debug: Debug level. Use 0 for production (int, DEFAULT: 0) 310 fix_qcritical: "yes" if the launcher should try to fix QCritical Errors (DEFAULT: "no") 311 rmflow: If "yes", the scheduler will remove the flow directory if the calculation 312 completed successfully. (DEFAULT: "no") 313 killjobs_if_errors: "yes" if the scheduler should try to kill all the runnnig jobs 314 before exiting due to an error. (DEFAULT: "yes") 315 """ 316 # Options passed to the scheduler. 317 self.sched_options = AttrDict( 318 weeks=kwargs.pop("weeks", 0), 319 days=kwargs.pop("days", 0), 320 hours=kwargs.pop("hours", 0), 321 minutes=kwargs.pop("minutes", 0), 322 seconds=kwargs.pop("seconds", 0), 323 #start_date=kwargs.pop("start_date", None), 324 ) 325 if all(not v for v in self.sched_options.values()): 326 raise self.Error("Wrong set of options passed to the scheduler.") 327 328 self.mailto = kwargs.pop("mailto", None) 329 self.verbose = int(kwargs.pop("verbose", 0)) 330 self.use_dynamic_manager = as_bool(kwargs.pop("use_dynamic_manager", False)) 331 self.max_njobs_inqueue = kwargs.pop("max_njobs_inqueue", 200) 332 self.max_ncores_used = kwargs.pop("max_ncores_used", None) 333 self.contact_resource_manager = as_bool(kwargs.pop("contact_resource_manager", False)) 334 335 self.remindme_s = float(kwargs.pop("remindme_s", 1 * 24 * 3600)) 336 self.max_num_pyexcs = int(kwargs.pop("max_num_pyexcs", 0)) 337 self.max_num_abierrs = int(kwargs.pop("max_num_abierrs", 0)) 338 self.safety_ratio = int(kwargs.pop("safety_ratio", 5)) 339 #self.max_etime_s = kwargs.pop("max_etime_s", ) 340 self.max_nlaunches = kwargs.pop("max_nlaunches", -1) 341 self.debug = kwargs.pop("debug", 0) 342 self.fix_qcritical = as_bool(kwargs.pop("fix_qcritical", False)) 343 self.rmflow = as_bool(kwargs.pop("rmflow", False)) 344 self.killjobs_if_errors = as_bool(kwargs.pop("killjobs_if_errors", True)) 345 346 self.customer_service_dir = kwargs.pop("customer_service_dir", None) 347 if self.customer_service_dir is not None: 348 self.customer_service_dir = Directory(self.customer_service_dir) 349 self._validate_customer_service() 350 351 if kwargs: 352 raise self.Error("Unknown arguments %s" % kwargs) 353 354 if not has_apscheduler: 355 raise RuntimeError("Install apscheduler with pip") 356 357 if has_sched_v3: 358 logger.warning("Using scheduler v>=3.0.0") 359 from apscheduler.schedulers.blocking import BlockingScheduler 360 self.sched = BlockingScheduler() 361 else: 362 from apscheduler.scheduler import Scheduler 363 self.sched = Scheduler(standalone=True) 364 365 self.nlaunch = 0 366 self.num_reminders = 1 367 368 # Used to keep track of the exceptions raised while the scheduler is running 369 self.exceptions = deque(maxlen=self.max_num_pyexcs + 10) 370 371 # Used to push additional info during the execution. 372 self.history = deque(maxlen=100) 373 374 @classmethod 375 def from_file(cls, filepath): 376 """Read the configuration parameters from a Yaml file.""" 377 with open(filepath, "rt") as fh: 378 return cls(**yaml.safe_load(fh)) 379 380 @classmethod 381 def from_string(cls, s): 382 """Create an istance from string s containing a YAML dictionary.""" 383 stream = StringIO(s) 384 stream.seek(0) 385 return cls(**yaml.safe_load(stream)) 386 387 @classmethod 388 def from_user_config(cls): 389 """ 390 Initialize the :class:`PyFlowScheduler` from the YAML file 'scheduler.yml'. 391 Search first in the working directory and then in the configuration directory of abipy. 392 393 Raises: 394 `RuntimeError` if file is not found. 395 """ 396 # Try in the current directory. 397 path = os.path.join(os.getcwd(), cls.YAML_FILE) 398 399 if os.path.exists(path): 400 return cls.from_file(path) 401 402 # Try in the configuration directory. 403 path = os.path.join(cls.USER_CONFIG_DIR, cls.YAML_FILE) 404 405 if os.path.exists(path): 406 return cls.from_file(path) 407 408 raise cls.Error("Cannot locate %s neither in current directory nor in %s" % (cls.YAML_FILE, path)) 409 410 def __str__(self): 411 """String representation.""" 412 lines = [self.__class__.__name__ + ", Pid: %d" % self.pid] 413 app = lines.append 414 app("Scheduler options: %s" % str(self.sched_options)) 415 416 if self.flow is not None: 417 app(80 * "=") 418 app(str(self.flow)) 419 420 return "\n".join(lines) 421 422 @property 423 def pid(self): 424 """The pid of the process associated to the scheduler.""" 425 try: 426 return self._pid 427 except AttributeError: 428 self._pid = os.getpid() 429 return self._pid 430 431 @property 432 def pid_file(self): 433 """ 434 Absolute path of the file with the pid. 435 The file is located in the workdir of the flow 436 """ 437 return self._pid_file 438 439 @property 440 def flow(self): 441 """`Flow`.""" 442 try: 443 return self._flow 444 except AttributeError: 445 return None 446 447 @property 448 def num_excs(self): 449 """Number of exceptions raised so far.""" 450 return len(self.exceptions) 451 452 def get_delta_etime(self): 453 """Returns a `timedelta` object representing with the elapsed time.""" 454 return timedelta(seconds=(time.time() - self.start_time)) 455 456 def add_flow(self, flow): 457 """ 458 Add a flow to the scheduler. 459 """ 460 if hasattr(self, "_flow"): 461 raise self.Error("Only one flow can be added to the scheduler.") 462 463 # Check if we are already using a scheduler to run this flow 464 flow.check_pid_file() 465 flow.set_spectator_mode(False) 466 467 # Build dirs and files (if not yet done) 468 flow.build() 469 470 with open(flow.pid_file, "wt") as fh: 471 fh.write(str(self.pid)) 472 473 self._pid_file = flow.pid_file 474 self._flow = flow 475 476 def _validate_customer_service(self): 477 """ 478 Validate input parameters if customer service is on then 479 create directory for tarball files with correct premissions for user and group. 480 """ 481 direc = self.customer_service_dir 482 if not direc.exists: 483 mode = 0o750 484 print("Creating customer_service_dir %s with mode %s" % (direc, mode)) 485 direc.makedirs() 486 os.chmod(direc.path, mode) 487 488 if self.mailto is None: 489 raise RuntimeError("customer_service_dir requires mailto option in scheduler.yml") 490 491 def _do_customer_service(self): 492 """ 493 This method is called before the shutdown of the scheduler. 494 If customer_service is on and the flow didn't completed successfully, 495 a lightweight tarball file with inputs and the most important output files 496 is created in customer_servide_dir. 497 """ 498 if self.customer_service_dir is None: return 499 doit = self.exceptions or not self.flow.all_ok 500 doit = True 501 if not doit: return 502 503 prefix = os.path.basename(self.flow.workdir) + "_" 504 505 import tempfile, datetime 506 suffix = str(datetime.datetime.now()).replace(" ", "-") 507 # Remove milliseconds 508 i = suffix.index(".") 509 if i != -1: suffix = suffix[:i] 510 suffix += ".tar.gz" 511 512 #back = os.getcwd() 513 #os.chdir(self.customer_service_dir.path) 514 515 _, tmpname = tempfile.mkstemp(suffix="_" + suffix, prefix=prefix, 516 dir=self.customer_service_dir.path, text=False) 517 518 print("Dear customer,\n We are about to generate a tarball in\n %s" % tmpname) 519 self.flow.make_light_tarfile(name=tmpname) 520 #os.chdir(back) 521 522 def start(self): 523 """ 524 Starts the scheduler in a new thread. Returns 0 if success. 525 In standalone mode, this method will block until there are no more scheduled jobs. 526 """ 527 self.history.append("Started on %s" % time.asctime()) 528 self.start_time = time.time() 529 530 if not has_apscheduler: 531 raise RuntimeError("Install apscheduler with pip") 532 533 if has_sched_v3: 534 self.sched.add_job(self.callback, "interval", **self.sched_options) 535 else: 536 self.sched.add_interval_job(self.callback, **self.sched_options) 537 538 errors = self.flow.look_before_you_leap() 539 if errors: 540 self.exceptions.append(errors) 541 return 1 542 543 # Try to run the job immediately. If something goes wrong return without initializing the scheduler. 544 self._runem_all() 545 546 if self.exceptions: 547 self.cleanup() 548 self.send_email(msg="Error while trying to run the flow for the first time!\n %s" % self.exceptions) 549 return 1 550 551 try: 552 self.sched.start() 553 return 0 554 555 except KeyboardInterrupt: 556 self.shutdown(msg="KeyboardInterrupt from user") 557 if ask_yesno("Do you want to cancel all the jobs in the queue? [Y/n]"): 558 print("Number of jobs cancelled:", self.flow.cancel()) 559 560 self.flow.pickle_dump() 561 return -1 562 563 def _runem_all(self): 564 """ 565 This function checks the status of all tasks, 566 tries to fix tasks that went unconverged, abicritical, or queuecritical 567 and tries to run all the tasks that can be submitted.+ 568 """ 569 excs = [] 570 flow = self.flow 571 572 # Allow to change the manager at run-time 573 if self.use_dynamic_manager: 574 from pymatgen.io.abinit.tasks import TaskManager 575 new_manager = TaskManager.from_user_config() 576 for work in flow: 577 work.set_manager(new_manager) 578 579 nqjobs = 0 580 if self.contact_resource_manager: # and flow.TaskManager.qadapter.QTYPE == "shell": 581 # This call is expensive and therefore it's optional (must be activate in manager.yml) 582 nqjobs = flow.get_njobs_in_queue() 583 if nqjobs is None: 584 nqjobs = 0 585 if flow.manager.has_queue: 586 logger.warning('Cannot get njobs_inqueue') 587 else: 588 # Here we just count the number of tasks in the flow who are running. 589 # This logic breaks down if there are multiple schedulers runnig 590 # but it's easy to implement without having to contact the resource manager. 591 nqjobs = (len(list(flow.iflat_tasks(status=flow.S_RUN))) + 592 len(list(flow.iflat_tasks(status=flow.S_SUB)))) 593 594 if nqjobs >= self.max_njobs_inqueue: 595 print("Too many jobs in the queue: %s. No job will be submitted." % nqjobs) 596 flow.check_status(show=False) 597 return 598 599 if self.max_nlaunches == -1: 600 max_nlaunch = self.max_njobs_inqueue - nqjobs 601 else: 602 max_nlaunch = min(self.max_njobs_inqueue - nqjobs, self.max_nlaunches) 603 604 # check status. 605 flow.check_status(show=False) 606 607 # This check is not perfect, we should make a list of tasks to sumbit 608 # and select only the subset so that we don't exceeed mac_ncores_used 609 # Many sections of this code should be rewritten. 610 #if self.max_ncores_used is not None and flow.ncores_used > self.max_ncores_used: 611 if self.max_ncores_used is not None and flow.ncores_allocated > self.max_ncores_used: 612 print("Cannot exceed max_ncores_used %s" % self.max_ncores_used, ", ncores_allocated:", flow.ncores_allocated) 613 return 614 615 # Try to restart the unconverged tasks 616 # TODO: do not fire here but prepare for fireing in rapidfire 617 for task in self.flow.unconverged_tasks: 618 try: 619 logger.info("Flow will try restart task %s" % task) 620 fired = task.restart() 621 if fired: 622 self.nlaunch += 1 623 max_nlaunch -= 1 624 if max_nlaunch == 0: 625 logger.info("Restart: too many jobs in the queue, returning") 626 flow.pickle_dump() 627 return 628 629 except task.RestartError: 630 excs.append(straceback()) 631 632 # Temporarily disable by MG because I don't know if fix_critical works after the 633 # introduction of the new qadapters 634 # reenabled by MsS disable things that do not work at low level 635 # fix only prepares for restarting, and sets to ready 636 if self.fix_qcritical: 637 nfixed = flow.fix_queue_critical() 638 if nfixed: print("Fixed %d QCritical error(s)" % nfixed) 639 640 nfixed = flow.fix_abicritical() 641 if nfixed: print("Fixed %d AbiCritical error(s)" % nfixed) 642 643 # update database 644 flow.pickle_dump() 645 646 # Submit the tasks that are ready. 647 try: 648 nlaunch = PyLauncher(flow).rapidfire(max_nlaunch=max_nlaunch, sleep_time=10) 649 self.nlaunch += nlaunch 650 if nlaunch: 651 cprint("[%s] Number of launches: %d" % (time.asctime(), nlaunch), "yellow") 652 653 except Exception: 654 excs.append(straceback()) 655 656 # check status. 657 flow.show_status() 658 659 if excs: 660 logger.critical("*** Scheduler exceptions:\n *** %s" % "\n".join(excs)) 661 self.exceptions.extend(excs) 662 663 def callback(self): 664 """The function that will be executed by the scheduler.""" 665 try: 666 return self._callback() 667 except Exception: 668 # All exceptions raised here will trigger the shutdown! 669 s = straceback() 670 self.exceptions.append(s) 671 672 # This is useful when debugging 673 #try: 674 # print("Exception in callback, will cancel all tasks") 675 # for task in self.flow.iflat_tasks(): 676 # task.cancel() 677 #except Exception: 678 # pass 679 680 self.shutdown(msg="Exception raised in callback!\n" + s) 681 682 def _callback(self): 683 """The actual callback.""" 684 if self.debug: 685 # Show the number of open file descriptors 686 print(">>>>> _callback: Number of open file descriptors: %s" % get_open_fds()) 687 688 self._runem_all() 689 690 all_ok = self.flow.all_ok 691 #if all_ok: all_ok = self.flow.on_all_ok() 692 693 # Mission accomplished. Shutdown the scheduler. 694 if all_ok: 695 return self.shutdown(msg="All tasks have reached S_OK. Will shutdown the scheduler and exit") 696 697 # Handle failures. 698 err_lines = [] 699 700 # Shall we send a reminder to the user? 701 delta_etime = self.get_delta_etime() 702 703 if delta_etime.total_seconds() > self.num_reminders * self.remindme_s: 704 self.num_reminders += 1 705 msg = ("Just to remind you that the scheduler with pid %s, flow %s\n has been running for %s " % 706 (self.pid, self.flow, delta_etime)) 707 retcode = self.send_email(msg, tag="[REMINDER]") 708 709 if retcode: 710 # Cannot send mail, shutdown now! 711 msg += ("\nThe scheduler tried to send an e-mail to remind the user\n" + 712 " but send_email returned %d. Error is not critical though!" % retcode) 713 print(msg) 714 #err_lines.append(msg) 715 716 #if delta_etime.total_seconds() > self.max_etime_s: 717 # err_lines.append("\nExceeded max_etime_s %s. Will shutdown the scheduler and exit" % self.max_etime_s) 718 719 # Too many exceptions. Shutdown the scheduler. 720 if self.num_excs > self.max_num_pyexcs: 721 msg = "Number of exceptions %s > %s. Will shutdown the scheduler and exit" % ( 722 self.num_excs, self.max_num_pyexcs) 723 err_lines.append(boxed(msg)) 724 725 # Paranoid check: disable the scheduler if we have submitted 726 # too many jobs (it might be due to some bug or other external reasons 727 # such as race conditions between difference callbacks!) 728 if self.nlaunch > self.safety_ratio * self.flow.num_tasks: 729 msg = "Too many jobs launched %d. Total number of tasks = %s, Will shutdown the scheduler and exit" % ( 730 self.nlaunch, self.flow.num_tasks) 731 err_lines.append(boxed(msg)) 732 733 # Count the number of tasks with status == S_ERROR. 734 if self.flow.num_errored_tasks > self.max_num_abierrs: 735 msg = "Number of tasks with ERROR status %s > %s. Will shutdown the scheduler and exit" % ( 736 self.flow.num_errored_tasks, self.max_num_abierrs) 737 err_lines.append(boxed(msg)) 738 739 # Test on the presence of deadlocks. 740 g = self.flow.find_deadlocks() 741 if g.deadlocked: 742 # Check the flow again so that status are updated. 743 self.flow.check_status() 744 745 g = self.flow.find_deadlocks() 746 #print("deadlocked:\n", g.deadlocked, "\nrunnables:\n", g.runnables, "\nrunning\n", g.running) 747 print("deadlocked:", len(g.deadlocked), ", runnables:", len(g.runnables), ", running:", len(g.running)) 748 if g.deadlocked and not g.runnables and not g.running: 749 err_lines.append("No runnable job with deadlocked tasks:\n%s." % str(g.deadlocked)) 750 751 if not g.runnables and not g.running: 752 # Check the flow again so that status are updated. 753 self.flow.check_status() 754 g = self.flow.find_deadlocks() 755 if not g.runnables and not g.running: 756 err_lines.append("No task is running and cannot find other tasks to submit.") 757 758 # Something wrong. Quit 759 if err_lines: 760 # Cancel all jobs. 761 if self.killjobs_if_errors: 762 cprint("killjobs_if_errors set to 'yes' in scheduler file. Will kill jobs before exiting.", "yellow") 763 try: 764 num_cancelled = 0 765 for task in self.flow.iflat_tasks(): 766 num_cancelled += task.cancel() 767 cprint("Killed %d tasks" % num_cancelled, "yellow") 768 except Exception as exc: 769 cprint("Exception while trying to kill jobs:\n%s" % str(exc), "red") 770 771 self.shutdown("\n".join(err_lines)) 772 773 return len(self.exceptions) 774 775 def cleanup(self): 776 """Cleanup routine: remove the pid file and save the pickle database""" 777 try: 778 os.remove(self.pid_file) 779 except OSError as exc: 780 logger.critical("Could not remove pid_file: %s", exc) 781 782 # Save the final status of the flow. 783 self.flow.pickle_dump() 784 785 def shutdown(self, msg): 786 """Shutdown the scheduler.""" 787 try: 788 self.cleanup() 789 self.history.append("Completed on: %s" % time.asctime()) 790 self.history.append("Elapsed time: %s" % self.get_delta_etime()) 791 792 if self.debug: 793 print(">>>>> shutdown: Number of open file descriptors: %s" % get_open_fds()) 794 795 retcode = self.send_email(msg) 796 if self.debug: 797 print("send_mail retcode", retcode) 798 799 # Write file with the list of exceptions: 800 if self.exceptions: 801 dump_file = os.path.join(self.flow.workdir, "_exceptions") 802 with open(dump_file, "wt") as fh: 803 fh.writelines(self.exceptions) 804 fh.write("Shutdown message:\n%s" % msg) 805 806 lines = [] 807 app = lines.append 808 app("Submitted on: %s" % time.ctime(self.start_time)) 809 app("Completed on: %s" % time.asctime()) 810 app("Elapsed time: %s" % str(self.get_delta_etime())) 811 812 if self.flow.all_ok: 813 app("Flow completed successfully") 814 else: 815 app("Flow %s didn't complete successfully" % repr(self.flow.workdir)) 816 app("use `abirun.py FLOWDIR debug` to analyze the problem.") 817 app("Shutdown message:\n%s" % msg) 818 819 print("") 820 print("\n".join(lines)) 821 print("") 822 823 self._do_customer_service() 824 825 if self.flow.all_ok: 826 print("Calling flow.finalize()...") 827 self.flow.finalize() 828 #print("finalized:", self.flow.finalized) 829 if self.rmflow: 830 app("Flow directory will be removed...") 831 try: 832 self.flow.rmtree() 833 except Exception: 834 logger.warning("Ignoring exception while trying to remove flow dir.") 835 836 finally: 837 # Shutdown the scheduler thus allowing the process to exit. 838 logger.debug('This should be the shutdown of the scheduler') 839 840 # Unschedule all the jobs before calling shutdown 841 #self.sched.print_jobs() 842 if not has_sched_v3: 843 #self.sched.print_jobs() 844 for job in self.sched.get_jobs(): 845 self.sched.unschedule_job(job) 846 self.sched.shutdown() 847 else: 848 self.sched.shutdown(wait=False) 849 850 # Uncomment the line below if shutdown does not work! 851 #os.system("kill -9 %d" % os.getpid()) 852 853 def send_email(self, msg, tag=None): 854 """ 855 Send an e-mail before completing the shutdown. 856 Returns 0 if success. 857 """ 858 try: 859 return self._send_email(msg, tag) 860 except Exception: 861 self.exceptions.append(straceback()) 862 return -2 863 864 def _send_email(self, msg, tag): 865 if self.mailto is None: 866 return -1 867 868 header = msg.splitlines() 869 app = header.append 870 871 app("Submitted on: %s" % time.ctime(self.start_time)) 872 app("Completed on: %s" % time.asctime()) 873 app("Elapsed time: %s" % str(self.get_delta_etime())) 874 app("Number of errored tasks: %d" % self.flow.num_errored_tasks) 875 app("Number of unconverged tasks: %d" % self.flow.num_unconverged_tasks) 876 877 strio = StringIO() 878 strio.writelines("\n".join(header) + 4 * "\n") 879 880 # Add the status of the flow. 881 self.flow.show_status(stream=strio) 882 883 if self.exceptions: 884 # Report the list of exceptions. 885 strio.writelines(self.exceptions) 886 887 if tag is None: 888 tag = " [ALL OK]" if self.flow.all_ok else " [WARNING]" 889 890 return sendmail(subject=self.flow.name + tag, text=strio.getvalue(), mailto=self.mailto) 891 892 893def sendmail(subject, text, mailto, sender=None): 894 """ 895 Sends an e-mail with unix sendmail. 896 897 Args: 898 subject: String with the subject of the mail. 899 text: String with the body of the mail. 900 mailto: String or list of string with the recipients. 901 sender: string with the sender address. 902 If sender is None, username@hostname is used. 903 904 Returns: 905 Exit status 906 """ 907 def user_at_host(): 908 from socket import gethostname 909 return os.getlogin() + "@" + gethostname() 910 911 # Body of the message. 912 try: 913 sender = user_at_host() if sender is None else sender 914 except OSError: 915 sender = 'abipyscheduler@youknowwhere' 916 917 if is_string(mailto): mailto = [mailto] 918 919 from email.mime.text import MIMEText 920 mail = MIMEText(text) 921 mail["Subject"] = subject 922 mail["From"] = sender 923 mail["To"] = ", ".join(mailto) 924 925 msg = mail.as_string() 926 927 # sendmail works much better than the python interface. 928 # Note that sendmail is available only on Unix-like OS. 929 from subprocess import Popen, PIPE 930 import sys 931 932 sendmail = which("sendmail") 933 if sendmail is None: return -1 934 if sys.version_info[0] < 3: 935 p = Popen([sendmail, "-t"], stdin=PIPE, stderr=PIPE) 936 else: 937 # msg is string not bytes so must use universal_newlines 938 p = Popen([sendmail, "-t"], stdin=PIPE, stderr=PIPE, universal_newlines=True) 939 940 outdata, errdata = p.communicate(msg) 941 return len(errdata) 942 943 944def __test_sendmail(): 945 retcode = sendmail("sendmail_test", text="hello\nworld", mailto="nobody@nowhere.com") 946 print("Retcode", retcode) 947 assert retcode == 0 948 949 950class BatchLauncherError(Exception): 951 """Exceptions raised by :class:`BatchLauncher`.""" 952 953 954class BatchLauncher(object): 955 """ 956 This object automates the execution of multiple flow. It generates a job script 957 that uses abirun.py to run each flow stored in self with a scheduler. 958 The execution of the flows is done in sequential but each scheduler will start 959 to submit the tasks of the flow in autoparal mode. 960 961 The `BatchLauncher` is pickleable, hence one can reload it, check if all flows are completed 962 and rerun only those that are not completed due to the timelimit. 963 """ 964 PICKLE_FNAME = "__BatchLauncher__.pickle" 965 966 Error = BatchLauncherError 967 968 @classmethod 969 def from_dir(cls, top, workdir=None, name=None, manager=None, max_depth=2): 970 """ 971 Find all flows located withing the directory `top` and build the `BatchLauncher`. 972 973 Args: 974 top: Top level directory or list of directories. 975 workdir: Batch workdir. 976 name: 977 manager: |TaskManager| object. If None, the manager is read from `manager.yml` 978 In this case the YAML file must provide the entry `batch_manager` that defined 979 the queue adapter used to submit the batch script. 980 max_depth: Search in directory only if it is N or fewer levels below top 981 """ 982 from .flows import Flow 983 984 def find_pickles(dirtop): 985 # Walk through each directory inside path and find the pickle database. 986 paths = [] 987 for dirpath, dirnames, filenames in os.walk(dirtop): 988 fnames = [f for f in filenames if f == Flow.PICKLE_FNAME] 989 paths.extend([os.path.join(dirpath, f) for f in fnames]) 990 return paths 991 992 if is_string(top): 993 pickle_paths = find_pickles(top) 994 else: 995 # List of directories. 996 pickle_paths = [] 997 for p in top: 998 pickle_paths.extend(find_pickles(p)) 999 1000 #workdir = os.path.join(top, "batch") if workdir is None else workdir 1001 workdir = "batch" if workdir is None else workdir 1002 new = cls(workdir, name=name, manager=manager) 1003 1004 for path in pickle_paths: 1005 new.add_flow(path) 1006 1007 return new 1008 1009 @classmethod 1010 def pickle_load(cls, filepath): 1011 """ 1012 Loads the object from a pickle file. 1013 1014 Args: 1015 filepath: Filename or directory name. It filepath is a directory, we 1016 scan the directory tree starting from filepath and we 1017 read the first pickle database. Raise RuntimeError if multiple 1018 databases are found. 1019 """ 1020 if os.path.isdir(filepath): 1021 # Walk through each directory inside path and find the pickle database. 1022 for dirpath, dirnames, filenames in os.walk(filepath): 1023 fnames = [f for f in filenames if f == cls.PICKLE_FNAME] 1024 if fnames: 1025 if len(fnames) == 1: 1026 filepath = os.path.join(dirpath, fnames[0]) 1027 break # Exit os.walk 1028 else: 1029 err_msg = "Found multiple databases:\n %s" % str(fnames) 1030 raise RuntimeError(err_msg) 1031 else: 1032 err_msg = "Cannot find %s inside directory %s" % (cls.PICKLE_FNAME, filepath) 1033 raise ValueError(err_msg) 1034 1035 with open(filepath, "rb") as fh: 1036 new = pickle.load(fh) 1037 1038 # new.flows is a list of strings with the workdir of the flows (see __getstate__). 1039 # Here we read the Flow from the pickle file so that we have 1040 # and up-to-date version and we set the flow in visitor_mode 1041 from .flows import Flow 1042 flow_workdirs, new.flows = new.flows, [] 1043 for flow in map(Flow.pickle_load, flow_workdirs): 1044 new.add_flow(flow) 1045 1046 return new 1047 1048 def pickle_dump(self): 1049 """Save the status of the object in pickle format.""" 1050 with open(os.path.join(self.workdir, self.PICKLE_FNAME), mode="wb") as fh: 1051 pickle.dump(self, fh) 1052 1053 def __getstate__(self): 1054 """ 1055 Return state is pickled as the contents for the instance. 1056 1057 Here we replace the flow objects with their workdir because we are observing 1058 the flows and we want to have the updated version when we reload the `BatchLauncher` from pickle. 1059 """ 1060 d = {k: v for k, v in self.__dict__.items() if k not in ["flows"]} 1061 d["flows"] = [flow.workdir for flow in self.flows] 1062 return d 1063 1064 def __init__(self, workdir, name=None, flows=None, manager=None, timelimit=None): 1065 """ 1066 Args: 1067 workdir: Working directory 1068 name: Name assigned to the `BatchLauncher`. 1069 flows: List of |Flow| objects. 1070 manager: |TaskManager| object responsible for the submission of the jobs. 1071 If manager is None, the object is initialized from the yaml file 1072 located either in the working directory or in the user configuration dir. 1073 timelimit: Time limit (int with seconds or string with time given with 1074 the slurm convention: "days-hours:minutes:seconds". 1075 If timelimit is None, the default value specified in the `batch_adapter` is taken. 1076 """ 1077 self.workdir = os.path.abspath(workdir) 1078 1079 if not os.path.exists(self.workdir): 1080 os.makedirs(self.workdir) 1081 else: 1082 pass 1083 #raise RuntimeError("Directory %s already exists. Use BatchLauncher.pickle_load()" % self.workdir) 1084 1085 self.name = os.path.basename(self.workdir) if name is None else name 1086 self.script_file = File(os.path.join(self.workdir, "run.sh")) 1087 self.qerr_file = File(os.path.join(self.workdir, "queue.qerr")) 1088 self.qout_file = File(os.path.join(self.workdir, "queue.qout")) 1089 self.log_file = File(os.path.join(self.workdir, "run.log")) 1090 self.batch_pidfile = File(os.path.join(self.workdir, "batch.pid")) 1091 1092 from .tasks import TaskManager 1093 manager = TaskManager.as_manager(manager) 1094 1095 # Extract the qadapater to be used for the batch script. 1096 try: 1097 self.qadapter = qad = manager.batch_adapter 1098 except AttributeError: 1099 raise RuntimeError("Your manager.yml file does not define an entry for the batch_adapter") 1100 1101 if qad is None: 1102 raise RuntimeError("Your manager.yml file does not define an entry for the batch_adapter") 1103 1104 # Set mpi_procs to 1 just to be on the safe side 1105 # Then allow the user to change the timelimit via __init__ 1106 qad.set_mpi_procs(1) 1107 if timelimit is not None: 1108 self.set_timelimit(timelimit) 1109 # FIXME: Remove me! 1110 self.set_timelimit(36000) 1111 1112 # Initialize list of flows. 1113 if flows is None: flows = [] 1114 if not isinstance(flows, (list, tuple)): flows = [flows] 1115 self.flows = flows 1116 1117 def set_timelimit(self, timelimit): 1118 """ 1119 Set the timelimit of the batch launcher. 1120 1121 Args: 1122 timelimit: Time limit (int with seconds or string with time given 1123 with the slurm convention: "days-hours:minutes:seconds". 1124 """ 1125 self.qad.set_timelimit(qu.timelimit_parser(timelimit)) 1126 1127 def to_string(self, **kwargs): 1128 lines = [] 1129 lines.extend(str(self.qadapter).splitlines()) 1130 1131 for i, flow in enumerate(self.flows): 1132 lines.append("Flow [%d] " % i + str(flow)) 1133 1134 return "\n".join(lines) 1135 1136 def __str__(self): 1137 return self.to_string() 1138 1139 def add_flow(self, flow): 1140 """ 1141 Add a flow. Accept filepath or |Flow| object. Return 1 if flow was added else 0. 1142 """ 1143 from .flows import Flow 1144 flow = Flow.as_flow(flow) 1145 1146 if flow in self.flows: 1147 raise self.Error("Cannot add same flow twice!") 1148 1149 if not flow.allocated: 1150 # Set the workdir of the flow here. Create a dir in self.workdir with name flow.name 1151 flow_workdir = os.path.join(self.workdir, os.path.basename(flow.name)) 1152 if flow_workdir in (flow.workdir for flow in self.flows): 1153 raise self.Error("Two flows have the same name and hence the same workdir!") 1154 flow.allocate(workdir=flow_workdir) 1155 1156 # Check if we are already using a scheduler to run this flow 1157 flow.check_pid_file() 1158 flow.set_spectator_mode(False) 1159 1160 flow.check_status(show=False) 1161 1162 #if flow.all_ok: 1163 # print("flow.all_ok: Ignoring %s" % flow) 1164 # return 0 1165 1166 self.flows.append(flow) 1167 #print("Flow %s added to the BatchLauncher" % flow) 1168 1169 return 1 1170 1171 def submit(self, **kwargs): 1172 """ 1173 Submit a job script that will run the schedulers with `abirun.py`. 1174 1175 Args: 1176 verbose: Verbosity level 1177 dry_run: Don't submit the script if dry_run. Default: False 1178 1179 Returns: 1180 namedtuple with attributes: 1181 retcode: Return code as returned by the submission script. 1182 qjob: :class:`QueueJob` object. 1183 num_flows_inbatch: Number of flows executed by the batch script 1184 1185 Return code of the job script submission. 1186 """ 1187 verbose, dry_run = kwargs.pop("verbose", 0), kwargs.pop("dry_run", False) 1188 1189 if not self.flows: 1190 print("Cannot submit an empty list of flows!") 1191 return 0 1192 1193 if hasattr(self, "qjob"): 1194 # This usually happens when we have loaded the object from pickle 1195 # and we have already submitted to batch script to the queue. 1196 # At this point we need to understand if the previous batch job 1197 # is still running before trying to submit it again. There are three cases: 1198 # 1199 # 1) The batch script has completed withing timelimit and therefore 1200 # the pid_file has been removed by the script. In this case, we 1201 # should not try to submit it again. 1202 1203 # 2) The batch script has been killed due to timelimit (other reasons are possible 1204 # but we neglect them). In this case the pid_file exists but there's no job with 1205 # this pid runnig and we can resubmit it again. 1206 1207 # 3) The batch script is still running. 1208 print("BatchLauncher has qjob %s" % self.qjob) 1209 1210 if not self.batch_pid_file.exists: 1211 print("It seems that the batch script reached the end. Wont' try to submit it again") 1212 return 0 1213 1214 msg = ("Here I have to understand if qjob is in the queue." 1215 " but I need an abstract API that can retrieve info from the queue id") 1216 raise RuntimeError(msg) 1217 1218 # TODO: Temptative API 1219 if self.qjob.in_status("Running|Queued"): 1220 print("Job is still running. Cannot submit") 1221 else: 1222 del self.qjob 1223 1224 script, num_flows_inbatch = self._get_script_nflows() 1225 1226 if num_flows_inbatch == 0: 1227 print("All flows have reached all_ok! Batch script won't be submitted") 1228 return 0 1229 1230 if verbose: 1231 print("*** submission script ***") 1232 print(script) 1233 1234 # Write the script. 1235 self.script_file.write(script) 1236 self.script_file.chmod(0o740) 1237 1238 # Builf the flow. 1239 for flow in self.flows: 1240 flow.build_and_pickle_dump() 1241 1242 # Submit the task and save the queue id. 1243 if dry_run: return -1 1244 1245 print("Will submit %s flows in batch script" % len(self.flows)) 1246 self.qjob, process = self.qadapter.submit_to_queue(self.script_file.path) 1247 1248 # Save the queue id in the pid file 1249 # The file will be removed by the job script if execution is completed. 1250 self.batch_pidfile.write(str(self.qjob.qid)) 1251 1252 self.pickle_dump() 1253 process.wait() 1254 1255 return dict2namedtuple(retcode=process.returncode, qjob=self.qjob, 1256 num_flows_inbatch=num_flows_inbatch) 1257 1258 def _get_script_nflows(self): 1259 """ 1260 Write the submission script. Return (script, num_flows_in_batch) 1261 """ 1262 flows_torun = [f for f in self.flows if not f.all_ok] 1263 if not flows_torun: 1264 return "", 0 1265 1266 executable = [ 1267 'export _LOG=%s' % self.log_file.path, 1268 'date1=$(date +"%s")', 1269 'echo Running abirun.py in batch mode > ${_LOG}', 1270 " ", 1271 ] 1272 app = executable.append 1273 1274 # Build list of abirun commands and save the name of the log files. 1275 self.sched_logs, num_flows = [], len(flows_torun) 1276 for i, flow in enumerate(flows_torun): 1277 1278 logfile = os.path.join(self.workdir, "log_" + os.path.basename(flow.workdir)) 1279 1280 app("echo Starting flow %d/%d on: `date` >> ${LOG}" % (i+1, num_flows)) 1281 app("\nabirun.py %s scheduler > %s" % (flow.workdir, logfile)) 1282 app("echo Returning from abirun on `date` with retcode $? >> ${_LOG}") 1283 1284 assert logfile not in self.sched_logs 1285 self.sched_logs.append(logfile) 1286 1287 # Remove the batch pid_file and compute elapsed time. 1288 executable.extend([ 1289 " ", 1290 "# Remove batch pid file", 1291 'rm %s' % self.batch_pidfile.path, 1292 " ", 1293 "# Compute elapsed time", 1294 'date2=$(date +"%s")', 1295 'diff=$(($date2-$date1))', 1296 'echo $(($diff / 60)) minutes and $(($diff % 60)) seconds elapsed. >> ${_LOG}' 1297 ]) 1298 1299 return self.qadapter.get_script_str( 1300 job_name=self.name, 1301 launch_dir=self.workdir, 1302 executable=executable, 1303 qout_path=self.qout_file.path, 1304 qerr_path=self.qerr_file.path, 1305 ), num_flows 1306 1307 def show_summary(self, **kwargs): 1308 """ 1309 Show a summary with the status of the flows. 1310 """ 1311 for flow in self.flows: 1312 flow.show_summary() 1313 1314 def show_status(self, **kwargs): 1315 """ 1316 Report the status of the flows. 1317 1318 Args: 1319 stream: File-like object, Default: sys.stdout 1320 verbose: Verbosity level (default 0). > 0 to show only the works that are not finalized. 1321 """ 1322 for flow in self.flows: 1323 flow.show_status(**kwargs) 1324