1# coding: utf-8
2# Copyright (c) Pymatgen Development Team.
3# Distributed under the terms of the MIT License.
4"""Tools for the submission of Tasks."""
5
6import os
7import time
8import ruamel.yaml as yaml
9import pickle
10
11from collections import deque
12from datetime import timedelta
13from io import StringIO
14from monty.io import get_open_fds
15from monty.string import boxed, is_string
16from monty.os.path import which
17from monty.collections import AttrDict, dict2namedtuple
18from monty.termcolor import cprint
19from .utils import as_bool, File, Directory
20from . import qutils as qu
21from pymatgen.util.io_utils import ask_yesno
22
23try:
24    import apscheduler
25    has_apscheduler = True
26    has_sched_v3 = apscheduler.version >= "3.0.0"
27except ImportError:
28    has_apscheduler = False
29
30import logging
31logger = logging.getLogger(__name__)
32
33__all__ = [
34    "ScriptEditor",
35    "PyLauncher",
36    "PyFlowScheduler",
37]
38
39
40def straceback():
41    """Returns a string with the traceback."""
42    import traceback
43    return traceback.format_exc()
44
45
46class ScriptEditor(object):
47    """Simple editor that simplifies the writing of shell scripts"""
48    _shell = '/bin/bash'
49
50    def __init__(self):
51        self._lines = []
52
53    @property
54    def shell(self):
55        return self._shell
56
57    def _add(self, text, pre=""):
58        if is_string(text):
59            self._lines.append(pre + text)
60        else:
61            self._lines.extend([pre + t for t in text])
62
63    def reset(self):
64        """Reset the editor."""
65        try:
66            del self._lines
67        except AttributeError:
68            pass
69
70    def shebang(self):
71        """Adds the shebang line."""
72        self._lines.append('#!' + self.shell)
73
74    def declare_var(self, key, val):
75        """Declare a env variable. If val is None the variable is unset."""
76        if val is not None:
77            line = "export " + key + '=' + str(val)
78        else:
79            line = "unset " + key
80
81        self._add(line)
82
83    def declare_vars(self, d):
84        """Declare the variables defined in the dictionary d."""
85        for k, v in d.items():
86            self.declare_var(k, v)
87
88    def export_envar(self, key, val):
89        """Export an environment variable."""
90        line = "export " + key + "=" + str(val)
91        self._add(line)
92
93    def export_envars(self, env):
94        """Export the environment variables contained in the dict env."""
95        for k, v in env.items():
96            self.export_envar(k, v)
97
98    def add_emptyline(self):
99        """Add an empty line."""
100        self._add("", pre="")
101
102    def add_comment(self, comment):
103        """Add a comment"""
104        self._add(comment, pre="# ")
105
106    def load_modules(self, modules):
107        """Load the list of specified modules."""
108        for module in modules:
109            self.load_module(module)
110
111    def load_module(self, module):
112        self._add('module load ' + module + " 2>> mods.err")
113
114    def add_line(self, line):
115        self._add(line)
116
117    def add_lines(self, lines):
118        self._add(lines)
119
120    def get_script_str(self, reset=True):
121        """Returns a string with the script and reset the editor if reset is True"""
122        s = "\n".join(l for l in self._lines)
123        if reset:
124            self.reset()
125        return s
126
127
128class PyLauncherError(Exception):
129    """Error class for PyLauncher."""
130
131
132class PyLauncher(object):
133    """This object handle the submission of the tasks contained in a |Flow|."""
134    Error = PyLauncherError
135
136    def __init__(self, flow, **kwargs):
137        """
138        Initialize the object
139
140        Args:
141            flow: |Flow| object
142            max_njobs_inqueue: The launcher will stop submitting jobs when the
143                number of jobs in the queue is >= Max number of jobs
144        """
145        self.flow = flow
146        self.max_njobs_inqueue = kwargs.get("max_njobs_inqueue", 200)
147
148        #self.flow.check_pid_file()
149
150    def single_shot(self):
151        """
152        Run the first :class:`Task` than is ready for execution.
153
154        Returns:
155            Number of jobs launched.
156        """
157        num_launched = 0
158
159        # Get the tasks that can be executed in each workflow.
160        tasks = []
161        for work in self.flow:
162            try:
163                task = work.fetch_task_to_run()
164
165                if task is not None:
166                    tasks.append(task)
167                else:
168                    # No task found, this usually happens when we have dependencies.
169                    # Beware of possible deadlocks here!
170                    logger.debug("No task to run! Possible deadlock")
171
172            except StopIteration:
173                logger.info("All tasks completed.")
174
175        # Submit the tasks and update the database.
176        if tasks:
177            tasks[0].start()
178            num_launched += 1
179
180            self.flow.pickle_dump()
181
182        return num_launched
183
184    def rapidfire(self, max_nlaunch=-1, max_loops=1, sleep_time=5):
185        """
186        Keeps submitting `Tasks` until we are out of jobs or no job is ready to run.
187
188        Args:
189            max_nlaunch: Maximum number of launches. default: no limit.
190            max_loops: Maximum number of loops
191            sleep_time: seconds to sleep between rapidfire loop iterations
192
193        Returns:
194            The number of tasks launched.
195        """
196        num_launched, do_exit, launched = 0, False, []
197
198        for count in range(max_loops):
199            if do_exit:
200                break
201            if count > 0:
202                time.sleep(sleep_time)
203
204            tasks = self.fetch_tasks_to_run()
205
206            # I don't know why but we receive duplicated tasks.
207            if any(task in launched for task in tasks):
208                logger.critical("numtasks %d already in launched list:\n%s" % (len(tasks), launched))
209
210            # Preventive test.
211            tasks = [t for t in tasks if t not in launched]
212
213            if not tasks:
214                continue
215
216            for task in tasks:
217                fired = task.start()
218                if fired:
219                    launched.append(task)
220                    num_launched += 1
221
222                if num_launched >= max_nlaunch > 0:
223                    logger.info('num_launched >= max_nlaunch, going back to sleep')
224                    do_exit = True
225                    break
226
227        # Update the database.
228        self.flow.pickle_dump()
229
230        return num_launched
231
232    def fetch_tasks_to_run(self):
233        """
234        Return the list of tasks that can be submitted.
235        Empty list if no task has been found.
236        """
237        tasks_to_run = []
238
239        for work in self.flow:
240            tasks_to_run.extend(work.fetch_alltasks_to_run())
241
242        return tasks_to_run
243
244
245class PyFlowSchedulerError(Exception):
246    """Exceptions raised by `PyFlowScheduler`."""
247
248
249class PyFlowScheduler(object):
250    """
251    This object schedules the submission of the tasks in a |Flow|.
252    There are two types of errors that might occur during the execution of the jobs:
253
254        #. Python exceptions
255        #. Errors in the ab-initio code
256
257    Python exceptions are easy to detect and are usually due to a bug in the python code or random errors such as IOError.
258    The set of errors in the ab-initio is much much broader. It includes wrong input data, segmentation
259    faults, problems with the resource manager, etc. The flow tries to handle the most common cases
260    but there's still a lot of room for improvement.
261    Note, in particular, that `PyFlowScheduler` will shutdown automatically in the following cases:
262
263        #. The number of python exceptions is > max_num_pyexcs
264
265        #. The number of task errors (i.e. the number of tasks whose status is S_ERROR) is > max_num_abierrs
266
267        #. The number of jobs launched becomes greater than (`safety_ratio` * total_number_of_tasks).
268
269        #. The scheduler will send an email to the user (specified by `mailto`) every `remindme_s` seconds.
270           If the mail cannot be sent, the scheduler will shutdown automatically.
271           This check prevents the scheduler from being trapped in an infinite loop.
272    """
273    # Configuration file.
274    YAML_FILE = "scheduler.yml"
275    USER_CONFIG_DIR = os.path.join(os.path.expanduser("~"), ".abinit", "abipy")
276
277    Error = PyFlowSchedulerError
278
279    @classmethod
280    def autodoc(cls):
281        i = cls.__init__.__doc__.index("Args:")
282        return cls.__init__.__doc__[i+5:]
283
284    def __init__(self, **kwargs):
285        """
286        Args:
287            weeks: number of weeks to wait (DEFAULT: 0).
288            days: number of days to wait (DEFAULT: 0).
289            hours: number of hours to wait (DEFAULT: 0).
290            minutes: number of minutes to wait (DEFAULT: 0).
291            seconds: number of seconds to wait (DEFAULT: 0).
292            mailto: The scheduler will send an email to `mailto` every `remindme_s` seconds.
293                (DEFAULT: None i.e. not used).
294            verbose: (int) verbosity level. (DEFAULT: 0)
295            use_dynamic_manager: "yes" if the |TaskManager| must be re-initialized from
296                file before launching the jobs. (DEFAULT: "no")
297            max_njobs_inqueue: Limit on the number of jobs that can be present in the queue. (DEFAULT: 200)
298            max_ncores_used: Maximum number of cores that can be used by the scheduler.
299            remindme_s: The scheduler will send an email to the user specified
300                by `mailto` every `remindme_s` seconds. (int, DEFAULT: 1 day).
301            max_num_pyexcs: The scheduler will exit if the number of python exceptions is > max_num_pyexcs
302                (int, DEFAULT: 0)
303            max_num_abierrs: The scheduler will exit if the number of errored tasks is > max_num_abierrs
304                (int, DEFAULT: 0)
305            safety_ratio: The scheduler will exits if the number of jobs launched becomes greater than
306               `safety_ratio` * total_number_of_tasks_in_flow. (int, DEFAULT: 5)
307            max_nlaunches: Maximum number of tasks launched in a single iteration of the scheduler.
308                (DEFAULT: -1 i.e. no limit)
309            debug: Debug level. Use 0 for production (int, DEFAULT: 0)
310            fix_qcritical: "yes" if the launcher should try to fix QCritical Errors (DEFAULT: "no")
311            rmflow: If "yes", the scheduler will remove the flow directory if the calculation
312                completed successfully. (DEFAULT: "no")
313            killjobs_if_errors: "yes" if the scheduler should try to kill all the runnnig jobs
314                before exiting due to an error. (DEFAULT: "yes")
315        """
316        # Options passed to the scheduler.
317        self.sched_options = AttrDict(
318            weeks=kwargs.pop("weeks", 0),
319            days=kwargs.pop("days", 0),
320            hours=kwargs.pop("hours", 0),
321            minutes=kwargs.pop("minutes", 0),
322            seconds=kwargs.pop("seconds", 0),
323            #start_date=kwargs.pop("start_date", None),
324        )
325        if all(not v for v in self.sched_options.values()):
326            raise self.Error("Wrong set of options passed to the scheduler.")
327
328        self.mailto = kwargs.pop("mailto", None)
329        self.verbose = int(kwargs.pop("verbose", 0))
330        self.use_dynamic_manager = as_bool(kwargs.pop("use_dynamic_manager", False))
331        self.max_njobs_inqueue = kwargs.pop("max_njobs_inqueue", 200)
332        self.max_ncores_used = kwargs.pop("max_ncores_used", None)
333        self.contact_resource_manager = as_bool(kwargs.pop("contact_resource_manager", False))
334
335        self.remindme_s = float(kwargs.pop("remindme_s", 1 * 24 * 3600))
336        self.max_num_pyexcs = int(kwargs.pop("max_num_pyexcs", 0))
337        self.max_num_abierrs = int(kwargs.pop("max_num_abierrs", 0))
338        self.safety_ratio = int(kwargs.pop("safety_ratio", 5))
339        #self.max_etime_s = kwargs.pop("max_etime_s", )
340        self.max_nlaunches = kwargs.pop("max_nlaunches", -1)
341        self.debug = kwargs.pop("debug", 0)
342        self.fix_qcritical = as_bool(kwargs.pop("fix_qcritical", False))
343        self.rmflow = as_bool(kwargs.pop("rmflow", False))
344        self.killjobs_if_errors = as_bool(kwargs.pop("killjobs_if_errors", True))
345
346        self.customer_service_dir = kwargs.pop("customer_service_dir", None)
347        if self.customer_service_dir is not None:
348            self.customer_service_dir = Directory(self.customer_service_dir)
349            self._validate_customer_service()
350
351        if kwargs:
352            raise self.Error("Unknown arguments %s" % kwargs)
353
354        if not has_apscheduler:
355            raise RuntimeError("Install apscheduler with pip")
356
357        if has_sched_v3:
358            logger.warning("Using scheduler v>=3.0.0")
359            from apscheduler.schedulers.blocking import BlockingScheduler
360            self.sched = BlockingScheduler()
361        else:
362            from apscheduler.scheduler import Scheduler
363            self.sched = Scheduler(standalone=True)
364
365        self.nlaunch = 0
366        self.num_reminders = 1
367
368        # Used to keep track of the exceptions raised while the scheduler is running
369        self.exceptions = deque(maxlen=self.max_num_pyexcs + 10)
370
371        # Used to push additional info during the execution.
372        self.history = deque(maxlen=100)
373
374    @classmethod
375    def from_file(cls, filepath):
376        """Read the configuration parameters from a Yaml file."""
377        with open(filepath, "rt") as fh:
378            return cls(**yaml.safe_load(fh))
379
380    @classmethod
381    def from_string(cls, s):
382        """Create an istance from string s containing a YAML dictionary."""
383        stream = StringIO(s)
384        stream.seek(0)
385        return cls(**yaml.safe_load(stream))
386
387    @classmethod
388    def from_user_config(cls):
389        """
390        Initialize the :class:`PyFlowScheduler` from the YAML file 'scheduler.yml'.
391        Search first in the working directory and then in the configuration directory of abipy.
392
393        Raises:
394            `RuntimeError` if file is not found.
395        """
396        # Try in the current directory.
397        path = os.path.join(os.getcwd(), cls.YAML_FILE)
398
399        if os.path.exists(path):
400            return cls.from_file(path)
401
402        # Try in the configuration directory.
403        path = os.path.join(cls.USER_CONFIG_DIR, cls.YAML_FILE)
404
405        if os.path.exists(path):
406            return cls.from_file(path)
407
408        raise cls.Error("Cannot locate %s neither in current directory nor in %s" % (cls.YAML_FILE, path))
409
410    def __str__(self):
411        """String representation."""
412        lines = [self.__class__.__name__ + ", Pid: %d" % self.pid]
413        app = lines.append
414        app("Scheduler options: %s" % str(self.sched_options))
415
416        if self.flow is not None:
417            app(80 * "=")
418            app(str(self.flow))
419
420        return "\n".join(lines)
421
422    @property
423    def pid(self):
424        """The pid of the process associated to the scheduler."""
425        try:
426            return self._pid
427        except AttributeError:
428            self._pid = os.getpid()
429            return self._pid
430
431    @property
432    def pid_file(self):
433        """
434        Absolute path of the file with the pid.
435        The file is located in the workdir of the flow
436        """
437        return self._pid_file
438
439    @property
440    def flow(self):
441        """`Flow`."""
442        try:
443            return self._flow
444        except AttributeError:
445            return None
446
447    @property
448    def num_excs(self):
449        """Number of exceptions raised so far."""
450        return len(self.exceptions)
451
452    def get_delta_etime(self):
453        """Returns a `timedelta` object representing with the elapsed time."""
454        return timedelta(seconds=(time.time() - self.start_time))
455
456    def add_flow(self, flow):
457        """
458        Add a flow to the scheduler.
459        """
460        if hasattr(self, "_flow"):
461            raise self.Error("Only one flow can be added to the scheduler.")
462
463        # Check if we are already using a scheduler to run this flow
464        flow.check_pid_file()
465        flow.set_spectator_mode(False)
466
467        # Build dirs and files (if not yet done)
468        flow.build()
469
470        with open(flow.pid_file, "wt") as fh:
471            fh.write(str(self.pid))
472
473        self._pid_file = flow.pid_file
474        self._flow = flow
475
476    def _validate_customer_service(self):
477        """
478        Validate input parameters if customer service is on then
479        create directory for tarball files with correct premissions for user and group.
480        """
481        direc = self.customer_service_dir
482        if not direc.exists:
483            mode = 0o750
484            print("Creating customer_service_dir %s with mode %s" % (direc, mode))
485            direc.makedirs()
486            os.chmod(direc.path, mode)
487
488        if self.mailto is None:
489            raise RuntimeError("customer_service_dir requires mailto option in scheduler.yml")
490
491    def _do_customer_service(self):
492        """
493        This method is called before the shutdown of the scheduler.
494        If customer_service is on and the flow didn't completed successfully,
495        a lightweight tarball file with inputs and the most important output files
496        is created in customer_servide_dir.
497        """
498        if self.customer_service_dir is None: return
499        doit = self.exceptions or not self.flow.all_ok
500        doit = True
501        if not doit: return
502
503        prefix = os.path.basename(self.flow.workdir) + "_"
504
505        import tempfile, datetime
506        suffix = str(datetime.datetime.now()).replace(" ", "-")
507        # Remove milliseconds
508        i = suffix.index(".")
509        if i != -1: suffix = suffix[:i]
510        suffix += ".tar.gz"
511
512        #back = os.getcwd()
513        #os.chdir(self.customer_service_dir.path)
514
515        _, tmpname = tempfile.mkstemp(suffix="_" + suffix, prefix=prefix,
516                                      dir=self.customer_service_dir.path, text=False)
517
518        print("Dear customer,\n We are about to generate a tarball in\n  %s" % tmpname)
519        self.flow.make_light_tarfile(name=tmpname)
520        #os.chdir(back)
521
522    def start(self):
523        """
524        Starts the scheduler in a new thread. Returns 0 if success.
525        In standalone mode, this method will block until there are no more scheduled jobs.
526        """
527        self.history.append("Started on %s" % time.asctime())
528        self.start_time = time.time()
529
530        if not has_apscheduler:
531            raise RuntimeError("Install apscheduler with pip")
532
533        if has_sched_v3:
534            self.sched.add_job(self.callback, "interval", **self.sched_options)
535        else:
536            self.sched.add_interval_job(self.callback, **self.sched_options)
537
538        errors = self.flow.look_before_you_leap()
539        if errors:
540            self.exceptions.append(errors)
541            return 1
542
543        # Try to run the job immediately. If something goes wrong return without initializing the scheduler.
544        self._runem_all()
545
546        if self.exceptions:
547            self.cleanup()
548            self.send_email(msg="Error while trying to run the flow for the first time!\n %s" % self.exceptions)
549            return 1
550
551        try:
552            self.sched.start()
553            return 0
554
555        except KeyboardInterrupt:
556            self.shutdown(msg="KeyboardInterrupt from user")
557            if ask_yesno("Do you want to cancel all the jobs in the queue? [Y/n]"):
558                print("Number of jobs cancelled:", self.flow.cancel())
559
560            self.flow.pickle_dump()
561            return -1
562
563    def _runem_all(self):
564        """
565        This function checks the status of all tasks,
566        tries to fix tasks that went unconverged, abicritical, or queuecritical
567        and tries to run all the tasks that can be submitted.+
568        """
569        excs = []
570        flow = self.flow
571
572        # Allow to change the manager at run-time
573        if self.use_dynamic_manager:
574            from pymatgen.io.abinit.tasks import TaskManager
575            new_manager = TaskManager.from_user_config()
576            for work in flow:
577                work.set_manager(new_manager)
578
579        nqjobs = 0
580        if self.contact_resource_manager: # and flow.TaskManager.qadapter.QTYPE == "shell":
581            # This call is expensive and therefore it's optional (must be activate in manager.yml)
582            nqjobs = flow.get_njobs_in_queue()
583            if nqjobs is None:
584                nqjobs = 0
585                if flow.manager.has_queue:
586                    logger.warning('Cannot get njobs_inqueue')
587        else:
588            # Here we just count the number of tasks in the flow who are running.
589            # This logic breaks down if there are multiple schedulers runnig
590            # but it's easy to implement without having to contact the resource manager.
591            nqjobs = (len(list(flow.iflat_tasks(status=flow.S_RUN))) +
592                      len(list(flow.iflat_tasks(status=flow.S_SUB))))
593
594        if nqjobs >= self.max_njobs_inqueue:
595            print("Too many jobs in the queue: %s. No job will be submitted." % nqjobs)
596            flow.check_status(show=False)
597            return
598
599        if self.max_nlaunches == -1:
600            max_nlaunch = self.max_njobs_inqueue - nqjobs
601        else:
602            max_nlaunch = min(self.max_njobs_inqueue - nqjobs, self.max_nlaunches)
603
604        # check status.
605        flow.check_status(show=False)
606
607        # This check is not perfect, we should make a list of tasks to sumbit
608        # and select only the subset so that we don't exceeed mac_ncores_used
609        # Many sections of this code should be rewritten.
610        #if self.max_ncores_used is not None and flow.ncores_used > self.max_ncores_used:
611        if self.max_ncores_used is not None and flow.ncores_allocated > self.max_ncores_used:
612            print("Cannot exceed max_ncores_used %s" % self.max_ncores_used, ", ncores_allocated:", flow.ncores_allocated)
613            return
614
615        # Try to restart the unconverged tasks
616        # TODO: do not fire here but prepare for fireing in rapidfire
617        for task in self.flow.unconverged_tasks:
618            try:
619                logger.info("Flow will try restart task %s" % task)
620                fired = task.restart()
621                if fired:
622                    self.nlaunch += 1
623                    max_nlaunch -= 1
624                    if max_nlaunch == 0:
625                        logger.info("Restart: too many jobs in the queue, returning")
626                        flow.pickle_dump()
627                        return
628
629            except task.RestartError:
630                excs.append(straceback())
631
632        # Temporarily disable by MG because I don't know if fix_critical works after the
633        # introduction of the new qadapters
634        # reenabled by MsS disable things that do not work at low level
635        # fix only prepares for restarting, and sets to ready
636        if self.fix_qcritical:
637            nfixed = flow.fix_queue_critical()
638            if nfixed: print("Fixed %d QCritical error(s)" % nfixed)
639
640        nfixed = flow.fix_abicritical()
641        if nfixed: print("Fixed %d AbiCritical error(s)" % nfixed)
642
643        # update database
644        flow.pickle_dump()
645
646        # Submit the tasks that are ready.
647        try:
648            nlaunch = PyLauncher(flow).rapidfire(max_nlaunch=max_nlaunch, sleep_time=10)
649            self.nlaunch += nlaunch
650            if nlaunch:
651                cprint("[%s] Number of launches: %d" % (time.asctime(), nlaunch), "yellow")
652
653        except Exception:
654            excs.append(straceback())
655
656        # check status.
657        flow.show_status()
658
659        if excs:
660            logger.critical("*** Scheduler exceptions:\n *** %s" % "\n".join(excs))
661            self.exceptions.extend(excs)
662
663    def callback(self):
664        """The function that will be executed by the scheduler."""
665        try:
666            return self._callback()
667        except Exception:
668            # All exceptions raised here will trigger the shutdown!
669            s = straceback()
670            self.exceptions.append(s)
671
672            # This is useful when debugging
673            #try:
674            #    print("Exception in callback, will cancel all tasks")
675            #    for task in self.flow.iflat_tasks():
676            #        task.cancel()
677            #except Exception:
678            #    pass
679
680            self.shutdown(msg="Exception raised in callback!\n" + s)
681
682    def _callback(self):
683        """The actual callback."""
684        if self.debug:
685            # Show the number of open file descriptors
686            print(">>>>> _callback: Number of open file descriptors: %s" % get_open_fds())
687
688        self._runem_all()
689
690        all_ok = self.flow.all_ok
691        #if all_ok: all_ok = self.flow.on_all_ok()
692
693        # Mission accomplished. Shutdown the scheduler.
694        if all_ok:
695            return self.shutdown(msg="All tasks have reached S_OK. Will shutdown the scheduler and exit")
696
697        # Handle failures.
698        err_lines = []
699
700        # Shall we send a reminder to the user?
701        delta_etime = self.get_delta_etime()
702
703        if delta_etime.total_seconds() > self.num_reminders * self.remindme_s:
704            self.num_reminders += 1
705            msg = ("Just to remind you that the scheduler with pid %s, flow %s\n has been running for %s " %
706                  (self.pid, self.flow, delta_etime))
707            retcode = self.send_email(msg, tag="[REMINDER]")
708
709            if retcode:
710                # Cannot send mail, shutdown now!
711                msg += ("\nThe scheduler tried to send an e-mail to remind the user\n" +
712                        " but send_email returned %d. Error is not critical though!" % retcode)
713                print(msg)
714                #err_lines.append(msg)
715
716        #if delta_etime.total_seconds() > self.max_etime_s:
717        #    err_lines.append("\nExceeded max_etime_s %s. Will shutdown the scheduler and exit" % self.max_etime_s)
718
719        # Too many exceptions. Shutdown the scheduler.
720        if self.num_excs > self.max_num_pyexcs:
721            msg = "Number of exceptions %s > %s. Will shutdown the scheduler and exit" % (
722                self.num_excs, self.max_num_pyexcs)
723            err_lines.append(boxed(msg))
724
725        # Paranoid check: disable the scheduler if we have submitted
726        # too many jobs (it might be due to some bug or other external reasons
727        # such as race conditions between difference callbacks!)
728        if self.nlaunch > self.safety_ratio * self.flow.num_tasks:
729            msg = "Too many jobs launched %d. Total number of tasks = %s, Will shutdown the scheduler and exit" % (
730                self.nlaunch, self.flow.num_tasks)
731            err_lines.append(boxed(msg))
732
733        # Count the number of tasks with status == S_ERROR.
734        if self.flow.num_errored_tasks > self.max_num_abierrs:
735            msg = "Number of tasks with ERROR status %s > %s. Will shutdown the scheduler and exit" % (
736                self.flow.num_errored_tasks, self.max_num_abierrs)
737            err_lines.append(boxed(msg))
738
739        # Test on the presence of deadlocks.
740        g = self.flow.find_deadlocks()
741        if g.deadlocked:
742            # Check the flow again so that status are updated.
743            self.flow.check_status()
744
745            g = self.flow.find_deadlocks()
746            #print("deadlocked:\n", g.deadlocked, "\nrunnables:\n", g.runnables, "\nrunning\n", g.running)
747            print("deadlocked:", len(g.deadlocked), ", runnables:", len(g.runnables), ", running:", len(g.running))
748            if g.deadlocked and not g.runnables and not g.running:
749                err_lines.append("No runnable job with deadlocked tasks:\n%s." % str(g.deadlocked))
750
751        if not g.runnables and not g.running:
752            # Check the flow again so that status are updated.
753            self.flow.check_status()
754            g = self.flow.find_deadlocks()
755            if not g.runnables and not g.running:
756                err_lines.append("No task is running and cannot find other tasks to submit.")
757
758        # Something wrong. Quit
759        if err_lines:
760            # Cancel all jobs.
761            if self.killjobs_if_errors:
762                cprint("killjobs_if_errors set to 'yes' in scheduler file. Will kill jobs before exiting.", "yellow")
763                try:
764                    num_cancelled = 0
765                    for task in self.flow.iflat_tasks():
766                        num_cancelled += task.cancel()
767                    cprint("Killed %d tasks" % num_cancelled, "yellow")
768                except Exception as exc:
769                    cprint("Exception while trying to kill jobs:\n%s" % str(exc), "red")
770
771            self.shutdown("\n".join(err_lines))
772
773        return len(self.exceptions)
774
775    def cleanup(self):
776        """Cleanup routine: remove the pid file and save the pickle database"""
777        try:
778            os.remove(self.pid_file)
779        except OSError as exc:
780            logger.critical("Could not remove pid_file: %s", exc)
781
782        # Save the final status of the flow.
783        self.flow.pickle_dump()
784
785    def shutdown(self, msg):
786        """Shutdown the scheduler."""
787        try:
788            self.cleanup()
789            self.history.append("Completed on: %s" % time.asctime())
790            self.history.append("Elapsed time: %s" % self.get_delta_etime())
791
792            if self.debug:
793                print(">>>>> shutdown: Number of open file descriptors: %s" % get_open_fds())
794
795            retcode = self.send_email(msg)
796            if self.debug:
797                print("send_mail retcode", retcode)
798
799            # Write file with the list of exceptions:
800            if self.exceptions:
801                dump_file = os.path.join(self.flow.workdir, "_exceptions")
802                with open(dump_file, "wt") as fh:
803                    fh.writelines(self.exceptions)
804                    fh.write("Shutdown message:\n%s" % msg)
805
806            lines = []
807            app = lines.append
808            app("Submitted on: %s" % time.ctime(self.start_time))
809            app("Completed on: %s" % time.asctime())
810            app("Elapsed time: %s" % str(self.get_delta_etime()))
811
812            if self.flow.all_ok:
813                app("Flow completed successfully")
814            else:
815                app("Flow %s didn't complete successfully" % repr(self.flow.workdir))
816                app("use `abirun.py FLOWDIR debug` to analyze the problem.")
817                app("Shutdown message:\n%s" % msg)
818
819            print("")
820            print("\n".join(lines))
821            print("")
822
823            self._do_customer_service()
824
825            if self.flow.all_ok:
826                print("Calling flow.finalize()...")
827                self.flow.finalize()
828                #print("finalized:", self.flow.finalized)
829                if self.rmflow:
830                    app("Flow directory will be removed...")
831                    try:
832                        self.flow.rmtree()
833                    except Exception:
834                        logger.warning("Ignoring exception while trying to remove flow dir.")
835
836        finally:
837            # Shutdown the scheduler thus allowing the process to exit.
838            logger.debug('This should be the shutdown of the scheduler')
839
840            # Unschedule all the jobs before calling shutdown
841            #self.sched.print_jobs()
842            if not has_sched_v3:
843                #self.sched.print_jobs()
844                for job in self.sched.get_jobs():
845                    self.sched.unschedule_job(job)
846                self.sched.shutdown()
847            else:
848                self.sched.shutdown(wait=False)
849
850            # Uncomment the line below if shutdown does not work!
851            #os.system("kill -9 %d" % os.getpid())
852
853    def send_email(self, msg, tag=None):
854        """
855        Send an e-mail before completing the shutdown.
856        Returns 0 if success.
857        """
858        try:
859            return self._send_email(msg, tag)
860        except Exception:
861            self.exceptions.append(straceback())
862            return -2
863
864    def _send_email(self, msg, tag):
865        if self.mailto is None:
866            return -1
867
868        header = msg.splitlines()
869        app = header.append
870
871        app("Submitted on: %s" % time.ctime(self.start_time))
872        app("Completed on: %s" % time.asctime())
873        app("Elapsed time: %s" % str(self.get_delta_etime()))
874        app("Number of errored tasks: %d" % self.flow.num_errored_tasks)
875        app("Number of unconverged tasks: %d" % self.flow.num_unconverged_tasks)
876
877        strio = StringIO()
878        strio.writelines("\n".join(header) + 4 * "\n")
879
880        # Add the status of the flow.
881        self.flow.show_status(stream=strio)
882
883        if self.exceptions:
884            # Report the list of exceptions.
885            strio.writelines(self.exceptions)
886
887        if tag is None:
888            tag = " [ALL OK]" if self.flow.all_ok else " [WARNING]"
889
890        return sendmail(subject=self.flow.name + tag, text=strio.getvalue(), mailto=self.mailto)
891
892
893def sendmail(subject, text, mailto, sender=None):
894    """
895    Sends an e-mail with unix sendmail.
896
897    Args:
898        subject: String with the subject of the mail.
899        text: String with the body of the mail.
900        mailto: String or list of string with the recipients.
901        sender: string with the sender address.
902            If sender is None, username@hostname is used.
903
904    Returns:
905        Exit status
906    """
907    def user_at_host():
908        from socket import gethostname
909        return os.getlogin() + "@" + gethostname()
910
911    # Body of the message.
912    try:
913        sender = user_at_host() if sender is None else sender
914    except OSError:
915        sender = 'abipyscheduler@youknowwhere'
916
917    if is_string(mailto): mailto = [mailto]
918
919    from email.mime.text import MIMEText
920    mail = MIMEText(text)
921    mail["Subject"] = subject
922    mail["From"] = sender
923    mail["To"] = ", ".join(mailto)
924
925    msg = mail.as_string()
926
927    # sendmail works much better than the python interface.
928    # Note that sendmail is available only on Unix-like OS.
929    from subprocess import Popen, PIPE
930    import sys
931
932    sendmail = which("sendmail")
933    if sendmail is None: return -1
934    if sys.version_info[0] < 3:
935        p = Popen([sendmail, "-t"], stdin=PIPE, stderr=PIPE)
936    else:
937        # msg is string not bytes so must use universal_newlines
938        p = Popen([sendmail, "-t"], stdin=PIPE, stderr=PIPE, universal_newlines=True)
939
940    outdata, errdata = p.communicate(msg)
941    return len(errdata)
942
943
944def __test_sendmail():
945    retcode = sendmail("sendmail_test", text="hello\nworld", mailto="nobody@nowhere.com")
946    print("Retcode", retcode)
947    assert retcode == 0
948
949
950class BatchLauncherError(Exception):
951    """Exceptions raised by :class:`BatchLauncher`."""
952
953
954class BatchLauncher(object):
955    """
956    This object automates the execution of multiple flow. It generates a job script
957    that uses abirun.py to run each flow stored in self with a scheduler.
958    The execution of the flows is done in sequential but each scheduler will start
959    to submit the tasks of the flow in autoparal mode.
960
961    The `BatchLauncher` is pickleable, hence one can reload it, check if all flows are completed
962    and rerun only those that are not completed due to the timelimit.
963    """
964    PICKLE_FNAME = "__BatchLauncher__.pickle"
965
966    Error = BatchLauncherError
967
968    @classmethod
969    def from_dir(cls, top, workdir=None, name=None, manager=None, max_depth=2):
970        """
971        Find all flows located withing the directory `top` and build the `BatchLauncher`.
972
973        Args:
974            top: Top level directory or list of directories.
975            workdir: Batch workdir.
976            name:
977            manager: |TaskManager| object. If None, the manager is read from `manager.yml`
978                In this case the YAML file must provide the entry `batch_manager` that defined
979                the queue adapter used to submit the batch script.
980            max_depth: Search in directory only if it is N or fewer levels below top
981        """
982        from .flows import Flow
983
984        def find_pickles(dirtop):
985            # Walk through each directory inside path and find the pickle database.
986            paths = []
987            for dirpath, dirnames, filenames in os.walk(dirtop):
988                fnames = [f for f in filenames if f == Flow.PICKLE_FNAME]
989                paths.extend([os.path.join(dirpath, f) for f in fnames])
990            return paths
991
992        if is_string(top):
993            pickle_paths = find_pickles(top)
994        else:
995            # List of directories.
996            pickle_paths = []
997            for p in top:
998                pickle_paths.extend(find_pickles(p))
999
1000        #workdir = os.path.join(top, "batch") if workdir is None else workdir
1001        workdir = "batch" if workdir is None else workdir
1002        new = cls(workdir, name=name, manager=manager)
1003
1004        for path in pickle_paths:
1005            new.add_flow(path)
1006
1007        return new
1008
1009    @classmethod
1010    def pickle_load(cls, filepath):
1011        """
1012        Loads the object from a pickle file.
1013
1014        Args:
1015            filepath: Filename or directory name. It filepath is a directory, we
1016                scan the directory tree starting from filepath and we
1017                read the first pickle database. Raise RuntimeError if multiple
1018                databases are found.
1019        """
1020        if os.path.isdir(filepath):
1021            # Walk through each directory inside path and find the pickle database.
1022            for dirpath, dirnames, filenames in os.walk(filepath):
1023                fnames = [f for f in filenames if f == cls.PICKLE_FNAME]
1024                if fnames:
1025                    if len(fnames) == 1:
1026                        filepath = os.path.join(dirpath, fnames[0])
1027                        break  # Exit os.walk
1028                    else:
1029                        err_msg = "Found multiple databases:\n %s" % str(fnames)
1030                        raise RuntimeError(err_msg)
1031            else:
1032                err_msg = "Cannot find %s inside directory %s" % (cls.PICKLE_FNAME, filepath)
1033                raise ValueError(err_msg)
1034
1035        with open(filepath, "rb") as fh:
1036            new = pickle.load(fh)
1037
1038        # new.flows is a list of strings with the workdir of the flows (see __getstate__).
1039        # Here we read the Flow from the pickle file so that we have
1040        # and up-to-date version and we set the flow in visitor_mode
1041        from .flows import Flow
1042        flow_workdirs, new.flows = new.flows, []
1043        for flow in map(Flow.pickle_load, flow_workdirs):
1044            new.add_flow(flow)
1045
1046        return new
1047
1048    def pickle_dump(self):
1049        """Save the status of the object in pickle format."""
1050        with open(os.path.join(self.workdir, self.PICKLE_FNAME), mode="wb") as fh:
1051            pickle.dump(self, fh)
1052
1053    def __getstate__(self):
1054        """
1055        Return state is pickled as the contents for the instance.
1056
1057        Here we replace the flow objects with their workdir because we are observing
1058        the flows and we want to have the updated version when we reload the `BatchLauncher` from pickle.
1059        """
1060        d = {k: v for k, v in self.__dict__.items() if k not in ["flows"]}
1061        d["flows"] = [flow.workdir for flow in self.flows]
1062        return d
1063
1064    def __init__(self, workdir, name=None, flows=None, manager=None, timelimit=None):
1065        """
1066        Args:
1067            workdir: Working directory
1068            name: Name assigned to the `BatchLauncher`.
1069            flows:  List of |Flow| objects.
1070            manager: |TaskManager| object responsible for the submission of the jobs.
1071                     If manager is None, the object is initialized from the yaml file
1072                     located either in the working directory or in the user configuration dir.
1073            timelimit: Time limit (int with seconds or string with time given with
1074                the slurm convention: "days-hours:minutes:seconds".
1075                If timelimit is None, the default value specified in the `batch_adapter` is taken.
1076        """
1077        self.workdir = os.path.abspath(workdir)
1078
1079        if not os.path.exists(self.workdir):
1080            os.makedirs(self.workdir)
1081        else:
1082            pass
1083            #raise RuntimeError("Directory %s already exists. Use BatchLauncher.pickle_load()" % self.workdir)
1084
1085        self.name = os.path.basename(self.workdir) if name is None else name
1086        self.script_file = File(os.path.join(self.workdir, "run.sh"))
1087        self.qerr_file = File(os.path.join(self.workdir, "queue.qerr"))
1088        self.qout_file = File(os.path.join(self.workdir, "queue.qout"))
1089        self.log_file = File(os.path.join(self.workdir, "run.log"))
1090        self.batch_pidfile = File(os.path.join(self.workdir, "batch.pid"))
1091
1092        from .tasks import TaskManager
1093        manager = TaskManager.as_manager(manager)
1094
1095        # Extract the qadapater to be used for the batch script.
1096        try:
1097            self.qadapter = qad = manager.batch_adapter
1098        except AttributeError:
1099            raise RuntimeError("Your manager.yml file does not define an entry for the batch_adapter")
1100
1101        if qad is None:
1102            raise RuntimeError("Your manager.yml file does not define an entry for the batch_adapter")
1103
1104        # Set mpi_procs to 1 just to be on the safe side
1105        # Then allow the user to change the timelimit via __init__
1106        qad.set_mpi_procs(1)
1107        if timelimit is not None:
1108            self.set_timelimit(timelimit)
1109            # FIXME: Remove me!
1110            self.set_timelimit(36000)
1111
1112        # Initialize list of flows.
1113        if flows is None: flows = []
1114        if not isinstance(flows, (list, tuple)): flows = [flows]
1115        self.flows = flows
1116
1117    def set_timelimit(self, timelimit):
1118        """
1119        Set the timelimit of the batch launcher.
1120
1121        Args:
1122            timelimit: Time limit (int with seconds or string with time given
1123                with the slurm convention: "days-hours:minutes:seconds".
1124        """
1125        self.qad.set_timelimit(qu.timelimit_parser(timelimit))
1126
1127    def to_string(self, **kwargs):
1128        lines = []
1129        lines.extend(str(self.qadapter).splitlines())
1130
1131        for i, flow in enumerate(self.flows):
1132            lines.append("Flow [%d] " % i + str(flow))
1133
1134        return "\n".join(lines)
1135
1136    def __str__(self):
1137        return self.to_string()
1138
1139    def add_flow(self, flow):
1140        """
1141        Add a flow. Accept filepath or |Flow| object. Return 1 if flow was added else 0.
1142        """
1143        from .flows import Flow
1144        flow = Flow.as_flow(flow)
1145
1146        if flow in self.flows:
1147            raise self.Error("Cannot add same flow twice!")
1148
1149        if not flow.allocated:
1150            # Set the workdir of the flow here. Create a dir in self.workdir with name flow.name
1151            flow_workdir = os.path.join(self.workdir, os.path.basename(flow.name))
1152            if flow_workdir in (flow.workdir for flow in self.flows):
1153                raise self.Error("Two flows have the same name and hence the same workdir!")
1154            flow.allocate(workdir=flow_workdir)
1155
1156        # Check if we are already using a scheduler to run this flow
1157        flow.check_pid_file()
1158        flow.set_spectator_mode(False)
1159
1160        flow.check_status(show=False)
1161
1162        #if flow.all_ok:
1163        #    print("flow.all_ok: Ignoring %s" % flow)
1164        #    return 0
1165
1166        self.flows.append(flow)
1167        #print("Flow %s added to the BatchLauncher" % flow)
1168
1169        return 1
1170
1171    def submit(self, **kwargs):
1172        """
1173        Submit a job script that will run the schedulers with `abirun.py`.
1174
1175        Args:
1176            verbose: Verbosity level
1177            dry_run: Don't submit the script if dry_run. Default: False
1178
1179        Returns:
1180            namedtuple with attributes:
1181                retcode: Return code as returned by the submission script.
1182                qjob: :class:`QueueJob` object.
1183                num_flows_inbatch: Number of flows executed by the batch script
1184
1185            Return code of the job script submission.
1186        """
1187        verbose, dry_run = kwargs.pop("verbose", 0), kwargs.pop("dry_run", False)
1188
1189        if not self.flows:
1190            print("Cannot submit an empty list of flows!")
1191            return 0
1192
1193        if hasattr(self, "qjob"):
1194            # This usually happens when we have loaded the object from pickle
1195            # and we have already submitted to batch script to the queue.
1196            # At this point we need to understand if the previous batch job
1197            # is still running before trying to submit it again. There are three cases:
1198            #
1199            # 1) The batch script has completed withing timelimit and therefore
1200            #    the pid_file has been removed by the script. In this case, we
1201            #    should not try to submit it again.
1202
1203            # 2) The batch script has been killed due to timelimit (other reasons are possible
1204            #    but we neglect them). In this case the pid_file exists but there's no job with
1205            #    this pid runnig and we can resubmit it again.
1206
1207            # 3) The batch script is still running.
1208            print("BatchLauncher has qjob %s" % self.qjob)
1209
1210            if not self.batch_pid_file.exists:
1211                print("It seems that the batch script reached the end. Wont' try to submit it again")
1212                return 0
1213
1214            msg = ("Here I have to understand if qjob is in the queue."
1215                   " but I need an abstract API that can retrieve info from the queue id")
1216            raise RuntimeError(msg)
1217
1218            # TODO: Temptative API
1219            if self.qjob.in_status("Running|Queued"):
1220                print("Job is still running. Cannot submit")
1221            else:
1222                del self.qjob
1223
1224        script, num_flows_inbatch = self._get_script_nflows()
1225
1226        if num_flows_inbatch == 0:
1227            print("All flows have reached all_ok! Batch script won't be submitted")
1228            return 0
1229
1230        if verbose:
1231            print("*** submission script ***")
1232            print(script)
1233
1234        # Write the script.
1235        self.script_file.write(script)
1236        self.script_file.chmod(0o740)
1237
1238        # Builf the flow.
1239        for flow in self.flows:
1240            flow.build_and_pickle_dump()
1241
1242        # Submit the task and save the queue id.
1243        if dry_run: return -1
1244
1245        print("Will submit %s flows in batch script" % len(self.flows))
1246        self.qjob, process = self.qadapter.submit_to_queue(self.script_file.path)
1247
1248        # Save the queue id in the pid file
1249        # The file will be removed by the job script if execution is completed.
1250        self.batch_pidfile.write(str(self.qjob.qid))
1251
1252        self.pickle_dump()
1253        process.wait()
1254
1255        return dict2namedtuple(retcode=process.returncode, qjob=self.qjob,
1256                               num_flows_inbatch=num_flows_inbatch)
1257
1258    def _get_script_nflows(self):
1259        """
1260        Write the submission script. Return (script, num_flows_in_batch)
1261        """
1262        flows_torun = [f for f in self.flows if not f.all_ok]
1263        if not flows_torun:
1264            return "", 0
1265
1266        executable = [
1267            'export _LOG=%s' % self.log_file.path,
1268            'date1=$(date +"%s")',
1269            'echo Running abirun.py in batch mode > ${_LOG}',
1270            " ",
1271        ]
1272        app = executable.append
1273
1274        # Build list of abirun commands and save the name of the log files.
1275        self.sched_logs, num_flows = [], len(flows_torun)
1276        for i, flow in enumerate(flows_torun):
1277
1278            logfile = os.path.join(self.workdir, "log_" + os.path.basename(flow.workdir))
1279
1280            app("echo Starting flow %d/%d on: `date` >> ${LOG}" % (i+1, num_flows))
1281            app("\nabirun.py %s scheduler > %s" % (flow.workdir, logfile))
1282            app("echo Returning from abirun on `date` with retcode $? >> ${_LOG}")
1283
1284            assert logfile not in self.sched_logs
1285            self.sched_logs.append(logfile)
1286
1287        # Remove the batch pid_file and compute elapsed time.
1288        executable.extend([
1289            " ",
1290            "# Remove batch pid file",
1291            'rm %s' % self.batch_pidfile.path,
1292            " ",
1293            "# Compute elapsed time",
1294            'date2=$(date +"%s")',
1295            'diff=$(($date2-$date1))',
1296            'echo $(($diff / 60)) minutes and $(($diff % 60)) seconds elapsed. >> ${_LOG}'
1297        ])
1298
1299        return self.qadapter.get_script_str(
1300            job_name=self.name,
1301            launch_dir=self.workdir,
1302            executable=executable,
1303            qout_path=self.qout_file.path,
1304            qerr_path=self.qerr_file.path,
1305        ), num_flows
1306
1307    def show_summary(self, **kwargs):
1308        """
1309        Show a summary with the status of the flows.
1310        """
1311        for flow in self.flows:
1312            flow.show_summary()
1313
1314    def show_status(self, **kwargs):
1315        """
1316        Report the status of the flows.
1317
1318        Args:
1319            stream: File-like object, Default: sys.stdout
1320            verbose: Verbosity level (default 0). > 0 to show only the works that are not finalized.
1321        """
1322        for flow in self.flows:
1323            flow.show_status(**kwargs)
1324