1# coding: utf-8
2"""This module provides functions and classes related to Task objects."""
3import os
4import time
5import datetime
6import shutil
7import collections
8import abc
9import copy
10import ruamel.yaml as yaml
11from io import StringIO
12import numpy as np
13
14from pprint import pprint
15from itertools import product
16from monty.string import is_string, list_strings
17from monty.termcolor import colored, cprint
18from monty.collections import AttrDict
19from monty.functools import lazy_property, return_none_if_raise
20from monty.json import MSONable
21from monty.fnmatch import WildCard
22from pymatgen.core.units import Memory
23from pymatgen.util.serialization import json_pretty_dump, pmg_serialize
24from .utils import File, Directory, irdvars_for_ext, abi_splitext, FilepathFixer, Condition, SparseHistogram
25from .qadapters import make_qadapter, QueueAdapter, QueueAdapterError
26from . import qutils as qu
27from .db import DBConnector
28from .nodes import Status, Node, NodeError, NodeResults, FileNode #, check_spectator
29from . import abiinspect
30from . import events
31from .abitimer import AbinitTimerParser
32
33
34__author__ = "Matteo Giantomassi"
35__copyright__ = "Copyright 2013, The Materials Project"
36__version__ = "0.1"
37__maintainer__ = "Matteo Giantomassi"
38
39__all__ = [
40    "TaskManager",
41    "AbinitBuild",
42    "ParalHintsParser",
43    "ParalHints",
44    "AbinitTask",
45    "ScfTask",
46    "NscfTask",
47    "RelaxTask",
48    "DdkTask",
49    "EffMassTask",
50    "PhononTask",
51    "ElasticTask",
52    "SigmaTask",
53    "EphTask",
54    "OpticTask",
55    "AnaddbTask",
56    "set_user_config_taskmanager",
57]
58
59import logging
60logger = logging.getLogger(__name__)
61
62# Tools and helper functions.
63
64
65def straceback():
66    """Returns a string with the traceback."""
67    import traceback
68    return traceback.format_exc()
69
70
71def nmltostring(nml):
72    """Convert a dictionary representing a Fortran namelist into a string."""
73    if not isinstance(nml, dict):
74        raise ValueError("nml should be a dict !")
75
76    curstr = ""
77    for key, group in nml.items():
78        namelist = ["&" + key]
79        for k, v in group.items():
80            if isinstance(v, list) or isinstance(v, tuple):
81                namelist.append(k + " = " + ",".join(map(str, v)) + ",")
82            elif is_string(v):
83                namelist.append(k + " = '" + str(v) + "',")
84            else:
85                namelist.append(k + " = " + str(v) + ",")
86        namelist.append("/")
87        curstr = curstr + "\n".join(namelist) + "\n"
88
89    return curstr
90
91
92class TaskResults(NodeResults):
93
94    JSON_SCHEMA = NodeResults.JSON_SCHEMA.copy()
95    JSON_SCHEMA["properties"] = {
96        "executable": {"type": "string", "required": True},
97    }
98
99    @classmethod
100    def from_node(cls, task):
101        """Initialize an instance from an |AbinitTask| instance."""
102        new = super().from_node(task)
103
104        new.update(
105            executable=task.executable,
106            #executable_version:
107            #task_events=
108            pseudos=[p.as_dict() for p in task.input.pseudos],
109            #input=task.input
110        )
111
112        new.register_gridfs_files(
113            run_abi=(task.input_file.path, "t"),
114            run_abo=(task.output_file.path, "t"),
115        )
116
117        return new
118
119
120class ParalConf(AttrDict):
121    """
122    This object store the parameters associated to one
123    of the possible parallel configurations reported by ABINIT.
124    Essentially it is a dictionary whose values can also be accessed
125    as attributes. It also provides default values for selected keys
126    that might not be present in the ABINIT dictionary.
127
128    Example:
129
130        --- !Autoparal
131        info:
132            version: 1
133            autoparal: 1
134            max_ncpus: 108
135        configurations:
136            -   tot_ncpus: 2         # Total number of CPUs
137                mpi_ncpus: 2         # Number of MPI processes.
138                omp_ncpus: 1         # Number of OMP threads (1 if not present)
139                mem_per_cpu: 10      # Estimated memory requirement per MPI processor in Megabytes.
140                efficiency: 0.4      # 1.0 corresponds to an "expected" optimal efficiency (strong scaling).
141                vars: {              # Dictionary with the variables that should be added to the input.
142                      varname1: varvalue1
143                      varname2: varvalue2
144                      }
145            -
146        ...
147
148    For paral_kgb we have:
149    nproc     npkpt  npspinor    npband     npfft    bandpp    weight
150       108       1         1        12         9         2        0.25
151       108       1         1       108         1         2       27.00
152        96       1         1        24         4         1        1.50
153        84       1         1        12         7         2        0.25
154    """
155    _DEFAULTS = {
156        "omp_ncpus": 1,
157        "mem_per_cpu": 0.0,
158        "vars": {}
159    }
160
161    def __init__(self, *args, **kwargs):
162        super().__init__(*args, **kwargs)
163
164        # Add default values if not already in self.
165        for k, v in self._DEFAULTS.items():
166            if k not in self:
167                self[k] = v
168
169    def __str__(self):
170        stream = StringIO()
171        pprint(self, stream=stream)
172        return stream.getvalue()
173
174    @property
175    def num_cores(self):
176        return self.mpi_procs * self.omp_threads
177
178    @property
179    def mem_per_proc(self):
180        return self.mem_per_cpu
181
182    @property
183    def mpi_procs(self):
184        return self.mpi_ncpus
185
186    @property
187    def omp_threads(self):
188        return self.omp_ncpus
189
190    @property
191    def speedup(self):
192        """Estimated speedup reported by ABINIT."""
193        return self.efficiency * self.num_cores
194
195    @property
196    def tot_mem(self):
197        """Estimated total memory in Mbs (computed from mem_per_proc)"""
198        return self.mem_per_proc * self.mpi_procs
199
200
201class ParalHintsError(Exception):
202    """Base error class for `ParalHints`."""
203
204
205class ParalHintsParser(object):
206
207    Error = ParalHintsError
208
209    def __init__(self):
210        # Used to push error strings.
211        self._errors = collections.deque(maxlen=100)
212
213    def add_error(self, errmsg):
214        self._errors.append(errmsg)
215
216    def parse(self, filename):
217        """
218        Read the `AutoParal` section (YAML format) from filename.
219        Assumes the file contains only one section.
220        """
221        with abiinspect.YamlTokenizer(filename) as r:
222            doc = r.next_doc_with_tag("!Autoparal")
223            try:
224                d = yaml.safe_load(doc.text_notag)
225                return ParalHints(info=d["info"], confs=d["configurations"])
226            except Exception:
227                import traceback
228                sexc = traceback.format_exc()
229                err_msg = "Wrong YAML doc:\n%s\n\nException:\n%s" % (doc.text, sexc)
230                self.add_error(err_msg)
231                raise self.Error(err_msg)
232
233
234class ParalHints(collections.abc.Iterable):
235    """
236    Iterable with the hints for the parallel execution reported by ABINIT.
237    """
238    Error = ParalHintsError
239
240    def __init__(self, info, confs):
241        self.info = info
242        self._confs = [ParalConf(**d) for d in confs]
243
244    @classmethod
245    def from_mpi_omp_lists(cls, mpi_procs, omp_threads):
246        """
247        Build a list of Parallel configurations from two lists
248        containing the number of MPI processes and the number of OpenMP threads
249        i.e. product(mpi_procs, omp_threads).
250        The configuration have parallel efficiency set to 1.0 and no input variables.
251        Mainly used for preparing benchmarks.
252        """
253        info = {}
254        confs = [ParalConf(mpi_ncpus=p, omp_ncpus=p, efficiency=1.0)
255                 for p, t in product(mpi_procs, omp_threads)]
256
257        return cls(info, confs)
258
259    def __getitem__(self, key):
260        return self._confs[key]
261
262    def __iter__(self):
263        return self._confs.__iter__()
264
265    def __len__(self):
266        return self._confs.__len__()
267
268    def __repr__(self):
269        return "\n".join(str(conf) for conf in self)
270
271    def __str__(self):
272        return repr(self)
273
274    @lazy_property
275    def max_cores(self):
276        """Maximum number of cores."""
277        return max(c.mpi_procs * c.omp_threads for c in self)
278
279    @lazy_property
280    def max_mem_per_proc(self):
281        """Maximum memory per MPI process."""
282        return max(c.mem_per_proc for c in self)
283
284    @lazy_property
285    def max_speedup(self):
286        """Maximum speedup."""
287        return max(c.speedup for c in self)
288
289    @lazy_property
290    def max_efficiency(self):
291        """Maximum parallel efficiency."""
292        return max(c.efficiency for c in self)
293
294    @pmg_serialize
295    def as_dict(self, **kwargs):
296        return {"info": self.info, "confs": self._confs}
297
298    @classmethod
299    def from_dict(cls, d):
300        return cls(info=d["info"], confs=d["confs"])
301
302    def copy(self):
303        """Shallow copy of self."""
304        return copy.copy(self)
305
306    def select_with_condition(self, condition, key=None):
307        """
308        Remove all the configurations that do not satisfy the given condition.
309
310        Args:
311            condition: dict or :class:`Condition` object with operators expressed with a Mongodb-like syntax
312            key: Selects the sub-dictionary on which condition is applied, e.g. key="vars"
313                 if we have to filter the configurations depending on the values in vars
314        """
315        condition = Condition.as_condition(condition)
316        new_confs = []
317
318        for conf in self:
319            # Select the object on which condition is applied
320            obj = conf if key is None else AttrDict(conf[key])
321            add_it = condition(obj=obj)
322            #if key is "vars": print("conf", conf, "added:", add_it)
323            if add_it: new_confs.append(conf)
324
325        self._confs = new_confs
326
327    def sort_by_efficiency(self, reverse=True):
328        """Sort the configurations in place. items with highest efficiency come first"""
329        self._confs.sort(key=lambda c: c.efficiency, reverse=reverse)
330        return self
331
332    def sort_by_speedup(self, reverse=True):
333        """Sort the configurations in place. items with highest speedup come first"""
334        self._confs.sort(key=lambda c: c.speedup, reverse=reverse)
335        return self
336
337    def sort_by_mem_per_proc(self, reverse=False):
338        """Sort the configurations in place. items with lowest memory per proc come first."""
339        # Avoid sorting if mem_per_cpu is not available.
340        if any(c.mem_per_proc > 0.0 for c in self):
341            self._confs.sort(key=lambda c: c.mem_per_proc, reverse=reverse)
342        return self
343
344    def multidimensional_optimization(self, priorities=("speedup", "efficiency")):
345        # Mapping property --> options passed to sparse_histogram
346        opts = dict(speedup=dict(step=1.0), efficiency=dict(step=0.1), mem_per_proc=dict(memory=1024))
347        #opts = dict(zip(priorities, bin_widths))
348
349        opt_confs = self._confs
350        for priority in priorities:
351            histogram = SparseHistogram(opt_confs, key=lambda c: getattr(c, priority), **opts[priority])
352            pos = 0 if priority == "mem_per_proc" else -1
353            opt_confs = histogram.values[pos]
354
355        #histogram.plot(show=True, savefig="hello.pdf")
356        return self.__class__(info=self.info, confs=opt_confs)
357
358    #def histogram_efficiency(self, step=0.1):
359    #    """Returns a :class:`SparseHistogram` with configuration grouped by parallel efficiency."""
360    #    return SparseHistogram(self._confs, key=lambda c: c.efficiency, step=step)
361
362    #def histogram_speedup(self, step=1.0):
363    #    """Returns a :class:`SparseHistogram` with configuration grouped by parallel speedup."""
364    #    return SparseHistogram(self._confs, key=lambda c: c.speedup, step=step)
365
366    #def histogram_memory(self, step=1024):
367    #    """Returns a :class:`SparseHistogram` with configuration grouped by memory."""
368    #    return SparseHistogram(self._confs, key=lambda c: c.speedup, step=step)
369
370    #def filter(self, qadapter):
371    #    """Return a new list of configurations that can be executed on the `QueueAdapter` qadapter."""
372    #    new_confs = [pconf for pconf in self if qadapter.can_run_pconf(pconf)]
373    #    return self.__class__(info=self.info, confs=new_confs)
374
375    def get_ordered_with_policy(self, policy, max_ncpus):
376        """
377        Sort and return a new list of configurations ordered according to the :class:`TaskPolicy` policy.
378        """
379        # Build new list since we are gonna change the object in place.
380        hints = self.__class__(self.info, confs=[c for c in self if c.num_cores <= max_ncpus])
381
382        # First select the configurations satisfying the condition specified by the user (if any)
383        bkp_hints = hints.copy()
384        if policy.condition:
385            logger.info("Applying condition %s" % str(policy.condition))
386            hints.select_with_condition(policy.condition)
387
388            # Undo change if no configuration fullfills the requirements.
389            if not hints:
390                hints = bkp_hints
391                logger.warning("Empty list of configurations after policy.condition")
392
393        # Now filter the configurations depending on the values in vars
394        bkp_hints = hints.copy()
395        if policy.vars_condition:
396            logger.info("Applying vars_condition %s" % str(policy.vars_condition))
397            hints.select_with_condition(policy.vars_condition, key="vars")
398
399            # Undo change if no configuration fullfills the requirements.
400            if not hints:
401                hints = bkp_hints
402                logger.warning("Empty list of configurations after policy.vars_condition")
403
404        if len(policy.autoparal_priorities) == 1:
405            # Example: hints.sort_by_speedup()
406            if policy.autoparal_priorities[0] in ['efficiency', 'speedup', 'mem_per_proc']:
407                getattr(hints, "sort_by_" + policy.autoparal_priorities[0])()
408            elif isinstance(policy.autoparal_priorities[0], collections.Mapping):
409                if policy.autoparal_priorities[0]['meta_priority'] == 'highest_speedup_minimum_efficiency_cutoff':
410                    min_efficiency = policy.autoparal_priorities[0].get('minimum_efficiency', 1.0)
411                    hints.select_with_condition({'efficiency': {'$gte': min_efficiency}})
412                    hints.sort_by_speedup()
413        else:
414            hints = hints.multidimensional_optimization(priorities=policy.autoparal_priorities)
415            if len(hints) == 0: raise ValueError("len(hints) == 0")
416
417        #TODO: make sure that num_cores == 1 is never selected when we have more than one configuration
418        #if len(hints) > 1:
419        #    hints.select_with_condition(dict(num_cores={"$eq": 1)))
420
421        # Return final (orderded ) list of configurations (best first).
422        return hints
423
424
425class TaskPolicy(object):
426    """
427    This object stores the parameters used by the |TaskManager| to
428    create the submission script and/or to modify the ABINIT variables
429    governing the parallel execution. A `TaskPolicy` object contains
430    a set of variables that specify the launcher, as well as the options
431    and the conditions used to select the optimal configuration for the parallel run
432    """
433    @classmethod
434    def as_policy(cls, obj):
435        """
436        Converts an object obj into a `:class:`TaskPolicy. Accepts:
437
438            * None
439            * TaskPolicy
440            * dict-like object
441        """
442        if obj is None:
443            # Use default policy.
444            return TaskPolicy()
445        else:
446            if isinstance(obj, cls):
447                return obj
448            elif isinstance(obj, collections.abc.Mapping):
449                return cls(**obj)
450            else:
451                raise TypeError("Don't know how to convert type %s to %s" % (type(obj), cls))
452
453    @classmethod
454    def autodoc(cls):
455        return """
456    autoparal:                # (integer). 0 to disable the autoparal feature (DEFAULT: 1 i.e. autoparal is on)
457    condition:                # condition used to filter the autoparal configurations (Mongodb-like syntax).
458                              # DEFAULT: empty i.e. ignored.
459    vars_condition:           # Condition used to filter the list of ABINIT variables reported by autoparal
460                              # (Mongodb-like syntax). DEFAULT: empty i.e. ignored.
461    frozen_timeout:           # A job is considered frozen and its status is set to ERROR if no change to
462                              # the output file has been done for `frozen_timeout` seconds. Accepts int with seconds or
463                              # string in slurm form i.e. days-hours:minutes:seconds. DEFAULT: 1 hour.
464    precedence:               # Under development.
465    autoparal_priorities:     # Under development.
466"""
467
468    def __init__(self, **kwargs):
469        """
470        See autodoc
471        """
472        self.autoparal = kwargs.pop("autoparal", 1)
473        self.condition = Condition(kwargs.pop("condition", {}))
474        self.vars_condition = Condition(kwargs.pop("vars_condition", {}))
475        self.precedence = kwargs.pop("precedence", "autoparal_conf")
476        self.autoparal_priorities = kwargs.pop("autoparal_priorities", ["speedup"])
477        #self.autoparal_priorities = kwargs.pop("autoparal_priorities", ["speedup", "efficiecy", "memory"]
478        # TODO frozen_timeout could be computed as a fraction of the timelimit of the qadapter!
479        self.frozen_timeout = qu.slurm_parse_timestr(kwargs.pop("frozen_timeout", "0-1:00:00"))
480
481        if kwargs:
482            raise ValueError("Found invalid keywords in policy section:\n %s" % str(kwargs.keys()))
483
484        # Consistency check.
485        if self.precedence not in ("qadapter", "autoparal_conf"):
486            raise ValueError("Wrong value for policy.precedence, should be qadapter or autoparal_conf")
487
488    def __str__(self):
489        lines = []
490        app = lines.append
491        for k, v in self.__dict__.items():
492            if k.startswith("_"): continue
493            app("%s: %s" % (k, v))
494        return "\n".join(lines)
495
496
497class ManagerIncreaseError(Exception):
498    """
499    Exception raised by the manager if the increase request failed
500    """
501
502
503class FixQueueCriticalError(Exception):
504    """
505    Error raised when an error could not be fixed at the task level
506    """
507
508
509# Global variable used to store the task manager returned by `from_user_config`.
510_USER_CONFIG_TASKMANAGER = None
511
512
513def set_user_config_taskmanager(manager):
514    """Change the default manager returned by TaskManager.from_user_config."""
515    global _USER_CONFIG_TASKMANAGER
516    _USER_CONFIG_TASKMANAGER = manager
517
518
519class TaskManager(MSONable):
520    """
521    A `TaskManager` is responsible for the generation of the job script and the submission
522    of the task, as well as for the specification of the parameters passed to the resource manager
523    (e.g. Slurm, PBS ...) and/or the run-time specification of the ABINIT variables governing the parallel execution.
524    A `TaskManager` delegates the generation of the submission script and the submission of the task to the :class:`QueueAdapter`.
525    A `TaskManager` has a :class:`TaskPolicy` that governs the specification of the parameters for the parallel executions.
526    Ideally, the TaskManager should be the **main entry point** used by the task to deal with job submission/optimization
527    """
528    YAML_FILE = "manager.yml"
529    USER_CONFIG_DIR = os.path.join(os.path.expanduser("~"), ".abinit", "abipy")
530
531    ENTRIES = {"policy", "qadapters", "db_connector", "batch_adapter"}
532
533    @classmethod
534    def autodoc(cls):
535        s = """
536# TaskManager configuration file (YAML Format)
537
538policy:
539    # Dictionary with options used to control the execution of the tasks.
540
541qadapters:
542    # List of qadapters objects (mandatory)
543    -  # qadapter_1
544    -  # qadapter_2
545
546db_connector:
547    # Connection to MongoDB database (optional)
548
549batch_adapter:
550    # Adapter used to submit flows with batch script. (optional)
551
552##########################################
553# Individual entries are documented below:
554##########################################
555
556"""
557        s += "policy: " + TaskPolicy.autodoc() + "\n"
558        s += "qadapter: " + QueueAdapter.autodoc() + "\n"
559        return s
560
561    @classmethod
562    def from_user_config(cls):
563        """
564        Initialize the |TaskManager| from the YAML file 'manager.yaml'.
565        Search first in the working directory and then in the AbiPy configuration directory.
566
567        Raises:
568            RuntimeError if file is not found.
569        """
570        global _USER_CONFIG_TASKMANAGER
571        if _USER_CONFIG_TASKMANAGER is not None:
572            return _USER_CONFIG_TASKMANAGER
573
574        # Try in the current directory then in user configuration directory.
575        path = os.path.join(os.getcwd(), cls.YAML_FILE)
576        if not os.path.exists(path):
577            path = os.path.join(cls.USER_CONFIG_DIR, cls.YAML_FILE)
578
579        if not os.path.exists(path):
580            raise RuntimeError(colored(
581                "\nCannot locate %s neither in current directory nor in %s\n"
582                "PLEASE READ THIS: !!!\n"
583                "To use AbiPy to run jobs this file must be present\n"
584                "It provides a description of the cluster/computer you are running on\n"
585                "Examples are provided in abipy/data/managers.\n"
586                "Use `abidoc.py manager` to access the documentation in the terminal.\n"
587                "See also: https://abinit.github.io/abipy/workflows/manager_examples.html\n" % (
588                    cls.YAML_FILE, path), color="red"))
589
590        _USER_CONFIG_TASKMANAGER = cls.from_file(path)
591        return _USER_CONFIG_TASKMANAGER
592
593    @classmethod
594    def from_file(cls, filename):
595        """Read the configuration parameters from the Yaml file filename."""
596        try:
597            with open(filename, "rt") as fh:
598                return cls.from_dict(yaml.safe_load(fh))
599        except Exception as exc:
600            print("Error while reading TaskManager parameters from %s\n" % filename)
601            raise
602
603    @classmethod
604    def from_string(cls, s):
605        """Create an instance from string s containing a YAML dictionary."""
606        return cls.from_dict(yaml.safe_load(s))
607
608    @classmethod
609    def as_manager(cls, obj):
610        """
611        Convert obj into TaskManager instance. Accepts string, filepath, dictionary, `TaskManager` object.
612        If obj is None, the manager is initialized from the user config file.
613        """
614        if isinstance(obj, cls): return obj
615        if obj is None: return cls.from_user_config()
616
617        if is_string(obj):
618            if os.path.exists(obj):
619                return cls.from_file(obj)
620            else:
621                return cls.from_string(obj)
622
623        elif isinstance(obj, collections.abc.Mapping):
624            return cls.from_dict(obj)
625        else:
626            raise TypeError("Don't know how to convert type %s to TaskManager" % type(obj))
627
628    @classmethod
629    def from_dict(cls, d):
630        """Create an instance from a dictionary."""
631        return cls(**{k: v for k, v in d.items() if k in cls.ENTRIES})
632
633    @pmg_serialize
634    def as_dict(self):
635        return copy.deepcopy(self._kwargs)
636
637    def __init__(self, **kwargs):
638        """
639        Args:
640            policy:None
641            qadapters:List of qadapters in YAML format
642            db_connector:Dictionary with data used to connect to the database (optional)
643        """
644        # Keep a copy of kwargs
645        self._kwargs = copy.deepcopy(kwargs)
646
647        self.policy = TaskPolicy.as_policy(kwargs.pop("policy", None))
648
649        # Initialize database connector (if specified)
650        self.db_connector = DBConnector(**kwargs.pop("db_connector", {}))
651
652        # Build list of QAdapters. Neglect entry if priority == 0 or `enabled: no"
653        qads = []
654        for d in kwargs.pop("qadapters"):
655            if d.get("enabled", False): continue
656            qad = make_qadapter(**d)
657            if qad.priority > 0:
658                qads.append(qad)
659            elif qad.priority < 0:
660                raise ValueError("qadapter cannot have negative priority:\n %s" % qad)
661
662        if not qads:
663            raise ValueError("Received emtpy list of qadapters")
664        #if len(qads) != 1:
665        #    raise NotImplementedError("For the time being multiple qadapters are not supported! Please use one adapter")
666
667        # Order qdapters according to priority.
668        qads = sorted(qads, key=lambda q: q.priority)
669        priorities = [q.priority for q in qads]
670        if len(priorities) != len(set(priorities)):
671            raise ValueError("Two or more qadapters have same priority. This is not allowed. Check taskmanager.yml")
672
673        self._qads, self._qadpos = tuple(qads), 0
674
675        # Initialize the qadapter for batch script submission.
676        d = kwargs.pop("batch_adapter", None)
677        self.batch_adapter = None
678        if d: self.batch_adapter = make_qadapter(**d)
679        #print("batch_adapter", self.batch_adapter)
680
681        if kwargs:
682            raise ValueError("Found invalid keywords in the taskmanager file:\n %s" % str(list(kwargs.keys())))
683
684    @lazy_property
685    def abinit_build(self):
686        """:class:`AbinitBuild` object with Abinit version and options used to build the code"""
687        return AbinitBuild(manager=self)
688
689    def to_shell_manager(self, mpi_procs=1):
690        """
691        Returns a new |TaskManager| with the same parameters as self but replace the :class:`QueueAdapter`
692        with a :class:`ShellAdapter` with mpi_procs so that we can submit the job without passing through the queue.
693        """
694        my_kwargs = copy.deepcopy(self._kwargs)
695        my_kwargs["policy"] = TaskPolicy(autoparal=0)
696
697        # On BlueGene we need at least two qadapters.
698        # One for running jobs on the computing nodes and another one
699        # for running small jobs on the fronted. These two qadapters
700        # will have different enviroments and different executables.
701        # If None of the q-adapters has qtype==shell, we change qtype to shell
702        # and we return a new Manager for sequential jobs with the same parameters as self.
703        # If the list contains a qadapter with qtype == shell, we ignore the remaining qadapters
704        # when we build the new Manager.
705        has_shell_qad = False
706        for d in my_kwargs["qadapters"]:
707            if d["queue"]["qtype"] == "shell": has_shell_qad = True
708        if has_shell_qad:
709            my_kwargs["qadapters"] = [d for d in my_kwargs["qadapters"] if d["queue"]["qtype"] == "shell"]
710
711        for d in my_kwargs["qadapters"]:
712            d["queue"]["qtype"] = "shell"
713            d["limits"]["min_cores"] = mpi_procs
714            d["limits"]["max_cores"] = mpi_procs
715
716            # If shell_runner is specified, replace mpi_runner with shell_runner
717            # in the script used to run jobs on the frontend.
718            # On same machines based on Slurm, indeed, mpirun/mpiexec is not available
719            # and jobs should be executed with `srun -n4 exec` when running on the computing nodes
720            # or with `exec` when running in sequential on the frontend.
721            if "job" in d and "shell_runner" in d["job"]:
722                shell_runner = d["job"]["shell_runner"]
723                #print("shell_runner:", shell_runner, type(shell_runner))
724                if not shell_runner or shell_runner == "None": shell_runner = ""
725                d["job"]["mpi_runner"] = shell_runner
726                #print("shell_runner:", shell_runner)
727
728        #print(my_kwargs)
729        new = self.__class__(**my_kwargs)
730        new.set_mpi_procs(mpi_procs)
731
732        return new
733
734    def new_with_fixed_mpi_omp(self, mpi_procs, omp_threads):
735        """
736        Return a new `TaskManager` in which autoparal has been disabled.
737        The jobs will be executed with `mpi_procs` MPI processes and `omp_threads` OpenMP threads.
738        Useful for generating input files for benchmarks.
739        """
740        new = self.deepcopy()
741        new.policy.autoparal = 0
742        new.set_mpi_procs(mpi_procs)
743        new.set_omp_threads(omp_threads)
744        return new
745
746    @property
747    def has_queue(self):
748        """True if we are submitting jobs via a queue manager."""
749        return self.qadapter.QTYPE.lower() != "shell"
750
751    @property
752    def qads(self):
753        """List of :class:`QueueAdapter` objects sorted according to priorities (highest comes first)"""
754        return self._qads
755
756    @property
757    def qadapter(self):
758        """The qadapter used to submit jobs."""
759        return self._qads[self._qadpos]
760
761    def select_qadapter(self, pconfs):
762        """
763        Given a list of parallel configurations, pconfs, this method select an `optimal` configuration
764        according to some criterion as well as the :class:`QueueAdapter` to use.
765
766        Args:
767            pconfs: :class:`ParalHints` object with the list of parallel configurations
768
769        Returns:
770            :class:`ParallelConf` object with the `optimal` configuration.
771        """
772        # Order the list of configurations according to policy.
773        policy, max_ncpus = self.policy, self.max_cores
774        pconfs = pconfs.get_ordered_with_policy(policy, max_ncpus)
775
776        if policy.precedence == "qadapter":
777
778            # Try to run on the qadapter with the highest priority.
779            for qadpos, qad in enumerate(self.qads):
780                possible_pconfs = [pc for pc in pconfs if qad.can_run_pconf(pc)]
781
782                if qad.allocation == "nodes":
783                    #if qad.allocation in ["nodes", "force_nodes"]:
784                    # Select the configuration divisible by nodes if possible.
785                    for pconf in possible_pconfs:
786                        if pconf.num_cores % qad.hw.cores_per_node == 0:
787                            return self._use_qadpos_pconf(qadpos, pconf)
788
789                # Here we select the first one.
790                if possible_pconfs:
791                    return self._use_qadpos_pconf(qadpos, possible_pconfs[0])
792
793        elif policy.precedence == "autoparal_conf":
794            # Try to run on the first pconf irrespectively of the priority of the qadapter.
795            for pconf in pconfs:
796                for qadpos, qad in enumerate(self.qads):
797
798                    if qad.allocation == "nodes" and not pconf.num_cores % qad.hw.cores_per_node == 0:
799                        continue # Ignore it. not very clean
800
801                    if qad.can_run_pconf(pconf):
802                        return self._use_qadpos_pconf(qadpos, pconf)
803
804        else:
805            raise ValueError("Wrong value of policy.precedence = %s" % policy.precedence)
806
807        # No qadapter could be found
808        raise RuntimeError("Cannot find qadapter for this run!")
809
810    def _use_qadpos_pconf(self, qadpos, pconf):
811        """
812        This function is called when we have accepted the :class:`ParalConf` pconf.
813        Returns pconf
814        """
815        self._qadpos = qadpos
816
817        # Change the number of MPI/OMP cores.
818        self.set_mpi_procs(pconf.mpi_procs)
819        if self.has_omp: self.set_omp_threads(pconf.omp_threads)
820
821        # Set memory per proc.
822        #FIXME: Fixer may have changed the memory per proc and should not be resetted by ParalConf
823        #self.set_mem_per_proc(pconf.mem_per_proc)
824        return pconf
825
826    def __str__(self):
827        """String representation."""
828        lines = []
829        app = lines.append
830        #app("[Task policy]\n%s" % str(self.policy))
831
832        for i, qad in enumerate(self.qads):
833            app("[Qadapter %d]\n%s" % (i, str(qad)))
834        app("Qadapter selected: %d" % self._qadpos)
835
836        if self.has_db:
837            app("[MongoDB database]:")
838            app(str(self.db_connector))
839
840        return "\n".join(lines)
841
842    @property
843    def has_db(self):
844        """True if we are using MongoDB database"""
845        return bool(self.db_connector)
846
847    @property
848    def has_omp(self):
849        """True if we are using OpenMP parallelization."""
850        return self.qadapter.has_omp
851
852    @property
853    def num_cores(self):
854        """Total number of CPUs used to run the task."""
855        return self.qadapter.num_cores
856
857    @property
858    def mpi_procs(self):
859        """Number of MPI processes."""
860        return self.qadapter.mpi_procs
861
862    @property
863    def mem_per_proc(self):
864        """Memory per MPI process."""
865        return self.qadapter.mem_per_proc
866
867    @property
868    def omp_threads(self):
869        """Number of OpenMP threads"""
870        return self.qadapter.omp_threads
871
872    def deepcopy(self):
873        """Deep copy of self."""
874        return copy.deepcopy(self)
875
876    def set_mpi_procs(self, mpi_procs):
877        """Set the number of MPI processes to use."""
878        self.qadapter.set_mpi_procs(mpi_procs)
879
880    def set_omp_threads(self, omp_threads):
881        """Set the number of OpenMp threads to use."""
882        self.qadapter.set_omp_threads(omp_threads)
883
884    def set_mem_per_proc(self, mem_mb):
885        """Set the memory (in Megabytes) per CPU."""
886        self.qadapter.set_mem_per_proc(mem_mb)
887
888    @property
889    def max_cores(self):
890        """
891        Maximum number of cores that can be used.
892        This value is mainly used in the autoparal part to get the list of possible configurations.
893        """
894        return max(q.hint_cores for q in self.qads)
895
896    def get_njobs_in_queue(self, username=None):
897        """
898        returns the number of jobs in the queue,
899        returns None when the number of jobs cannot be determined.
900
901        Args:
902            username: (str) the username of the jobs to count (default is to autodetect)
903        """
904        return self.qadapter.get_njobs_in_queue(username=username)
905
906    def cancel(self, job_id):
907        """Cancel the job. Returns exit status."""
908        return self.qadapter.cancel(job_id)
909
910    def write_jobfile(self, task, **kwargs):
911        """
912        Write the submission script. Return the path of the script
913
914        ================  ============================================
915        kwargs            Meaning
916        ================  ============================================
917        exec_args         List of arguments passed to task.executable.
918                          Default: no arguments.
919
920        ================  ============================================
921        """
922        script = self.qadapter.get_script_str(
923            job_name=task.name,
924            launch_dir=task.workdir,
925            executable=task.executable,
926            qout_path=task.qout_file.path,
927            qerr_path=task.qerr_file.path,
928            stdin=task.files_file.path,
929            stdout=task.log_file.path,
930            stderr=task.stderr_file.path,
931            exec_args=kwargs.pop("exec_args", []),
932        )
933
934        # Write the script.
935        with open(task.job_file.path, "w") as fh:
936            fh.write(script)
937            task.job_file.chmod(0o740)
938            return task.job_file.path
939
940    def launch(self, task, **kwargs):
941        """
942        Build the input files and submit the task via the :class:`Qadapter`
943
944        Args:
945            task: |Task| object.
946
947        Returns:
948            Process object.
949        """
950        if task.status == task.S_LOCKED:
951            raise ValueError("You shall not submit a locked task!")
952
953        # Build the task
954        task.build()
955
956        # Pass information on the time limit to Abinit (we always assume ndtset == 1)
957        if isinstance(task, AbinitTask):
958            args = kwargs.get("exec_args", [])
959            if args is None: args = []
960            args = args[:]
961            args.append("--timelimit %s" % qu.time2slurm(self.qadapter.timelimit))
962            kwargs["exec_args"] = args
963
964        # Write the submission script
965        script_file = self.write_jobfile(task, **kwargs)
966
967        # Submit the task and save the queue id.
968        try:
969            qjob, process = self.qadapter.submit_to_queue(script_file)
970            task.set_status(task.S_SUB, msg='Submitted to queue')
971            task.set_qjob(qjob)
972            return process
973
974        except self.qadapter.MaxNumLaunchesError as exc:
975            # TODO: Here we should try to switch to another qadapter
976            # 1) Find a new parallel configuration in those stored in task.pconfs
977            # 2) Change the input file.
978            # 3) Regenerate the submission script
979            # 4) Relaunch
980            task.set_status(task.S_ERROR, msg="max_num_launches reached: %s" % str(exc))
981            raise
982
983    def get_collection(self, **kwargs):
984        """Return the MongoDB collection used to store the results."""
985        return self.db_connector.get_collection(**kwargs)
986
987    def increase_mem(self):
988        # OLD
989        # with GW calculations in mind with GW mem = 10,
990        # the response fuction is in memory and not distributed
991        # we need to increase memory if jobs fail ...
992        # return self.qadapter.more_mem_per_proc()
993        try:
994            self.qadapter.more_mem_per_proc()
995        except QueueAdapterError:
996            # here we should try to switch to an other qadapter
997            raise ManagerIncreaseError('manager failed to increase mem')
998
999    def increase_ncpus(self):
1000        """
1001        increase the number of cpus, first ask the current qadapter, if that one raises a QadapterIncreaseError
1002        switch to the next qadapter. If all fail raise an ManagerIncreaseError
1003        """
1004        try:
1005            self.qadapter.more_cores()
1006        except QueueAdapterError:
1007            # here we should try to switch to an other qadapter
1008            raise ManagerIncreaseError('manager failed to increase ncpu')
1009
1010    def increase_resources(self):
1011        try:
1012            self.qadapter.more_cores()
1013            return
1014        except QueueAdapterError:
1015            pass
1016
1017        try:
1018            self.qadapter.more_mem_per_proc()
1019        except QueueAdapterError:
1020            # here we should try to switch to an other qadapter
1021            raise ManagerIncreaseError('manager failed to increase resources')
1022
1023    def exclude_nodes(self, nodes):
1024        try:
1025            self.qadapter.exclude_nodes(nodes=nodes)
1026        except QueueAdapterError:
1027            # here we should try to switch to an other qadapter
1028            raise ManagerIncreaseError('manager failed to exclude nodes')
1029
1030    def increase_time(self):
1031        try:
1032            self.qadapter.more_time()
1033        except QueueAdapterError:
1034            # here we should try to switch to an other qadapter
1035            raise ManagerIncreaseError('manager failed to increase time')
1036
1037
1038class AbinitBuild(object):
1039    """
1040    This object stores information on the options used to build Abinit
1041
1042        .. attribute:: info
1043            String with build information as produced by `abinit -b`
1044
1045        .. attribute:: version
1046            Abinit version number e.g 8.0.1 (string)
1047
1048        .. attribute:: has_netcdf
1049            True if netcdf is enabled.
1050
1051        .. attribute:: has_libxc
1052            True if libxc is enabled.
1053
1054        .. attribute:: has_omp
1055            True if OpenMP is enabled.
1056
1057        .. attribute:: has_mpi
1058            True if MPI is enabled.
1059
1060        .. attribute:: has_mpiio
1061            True if MPI-IO is supported.
1062    """
1063    def __init__(self, workdir=None, manager=None):
1064        manager = TaskManager.as_manager(manager).to_shell_manager(mpi_procs=1)
1065
1066        # Build a simple manager to run the job in a shell subprocess
1067        import tempfile
1068        workdir = tempfile.mkdtemp() if workdir is None else workdir
1069
1070        # Generate a shell script to execute `abinit -b`
1071        stdout = os.path.join(workdir, "run.abo")
1072        script = manager.qadapter.get_script_str(
1073            job_name="abinit_b",
1074            launch_dir=workdir,
1075            executable="abinit",
1076            qout_path=os.path.join(workdir, "queue.qout"),
1077            qerr_path=os.path.join(workdir, "queue.qerr"),
1078            #stdin=os.path.join(workdir, "run.files"),
1079            stdout=stdout,
1080            stderr=os.path.join(workdir, "run.err"),
1081            exec_args=["-b"],
1082        )
1083
1084        # Execute the script.
1085        script_file = os.path.join(workdir, "job.sh")
1086        with open(script_file, "wt") as fh:
1087            fh.write(script)
1088        qjob, process = manager.qadapter.submit_to_queue(script_file)
1089        process.wait()
1090
1091        if process.returncode != 0:
1092            logger.critical("Error while executing %s" % script_file)
1093            print("stderr:\n", process.stderr.read())
1094            #print("stdout:", process.stdout.read())
1095
1096        # To avoid: ResourceWarning: unclosed file <_io.BufferedReader name=87> in py3k
1097        process.stderr.close()
1098
1099        with open(stdout, "rt") as fh:
1100            self.info = fh.read()
1101
1102        # info string has the following format.
1103        """
1104        === Build Information ===
1105         Version       : 8.0.1
1106         Build target  : x86_64_darwin15.0.0_gnu5.3
1107         Build date    : 20160122
1108
1109        === Compiler Suite ===
1110         C compiler       : gnu
1111         C++ compiler     : gnuApple
1112         Fortran compiler : gnu5.3
1113         CFLAGS           :  -g -O2 -mtune=native -march=native
1114         CXXFLAGS         :  -g -O2 -mtune=native -march=native
1115         FCFLAGS          :  -g -ffree-line-length-none
1116         FC_LDFLAGS       :
1117
1118        === Optimizations ===
1119         Debug level        : basic
1120         Optimization level : standard
1121         Architecture       : unknown_unknown
1122
1123        === Multicore ===
1124         Parallel build : yes
1125         Parallel I/O   : yes
1126         openMP support : no
1127         GPU support    : no
1128
1129        === Connectors / Fallbacks ===
1130         Connectors on : yes
1131         Fallbacks on  : yes
1132         DFT flavor    : libxc-fallback+atompaw-fallback+wannier90-fallback
1133         FFT flavor    : none
1134         LINALG flavor : netlib
1135         MATH flavor   : none
1136         TIMER flavor  : abinit
1137         TRIO flavor   : netcdf+etsf_io-fallback
1138
1139        === Experimental features ===
1140         Bindings            : @enable_bindings@
1141         Exports             : no
1142         GW double-precision : yes
1143
1144        === Bazaar branch information ===
1145         Branch ID : gmatteo@gmac-20160112110440-lf6exhneqim9082h
1146         Revision  : 1226
1147         Committed : 0
1148        """
1149        self.version = "0.0.0"
1150        self.has_netcdf = False
1151        self.has_omp = False
1152        self.has_mpi, self.has_mpiio = False, False
1153        self.has_libxc = False
1154
1155        def yesno2bool(line):
1156            ans = line.split()[-1].lower()
1157            try:
1158                return dict(yes=True, no=False, auto=True)[ans]
1159            except KeyError:
1160                # Temporary hack for abinit v9
1161                return True
1162
1163        # Parse info.
1164        # flavor options were used in Abinit v8
1165        for line in self.info.splitlines():
1166            if "Version" in line: self.version = line.split()[-1]
1167            if "TRIO flavor" in line:
1168                self.has_netcdf = "netcdf" in line
1169            if "NetCDF Fortran" in line:
1170                self.has_netcdf = yesno2bool(line)
1171            if "DFT flavor" in line:
1172                self.has_libxc = "libxc" in line
1173            if "LibXC" in line:
1174                self.has_libxc = yesno2bool(line)
1175            if "openMP support" in line:
1176                self.has_omp = yesno2bool(line)
1177            if "Parallel build" in line:
1178                ans = line.split()[-1].lower()
1179                if ans == "@enable_mpi@":
1180                    # Temporary hack for abinit v9
1181                    self.has_mpi = True
1182                else:
1183                    self.has_mpi = yesno2bool(line)
1184            if "Parallel I/O" in line:
1185                self.has_mpiio = yesno2bool(line)
1186
1187        # Temporary hack for abinit v9
1188        #from abipy.core.testing import cmp_version
1189        #if cmp_version(self.version, "9.0.0", op=">="):
1190        #    self.has_netcdf = True
1191
1192    def __str__(self):
1193        lines = []
1194        app = lines.append
1195        app("Abinit Build Information:")
1196        app("    Abinit version: %s" % self.version)
1197        app("    MPI: %s, MPI-IO: %s, OpenMP: %s" % (self.has_mpi, self.has_mpiio, self.has_omp))
1198        app("    Netcdf: %s" % self.has_netcdf)
1199        return "\n".join(lines)
1200
1201    def version_ge(self, version_string):
1202        """True is Abinit version is >= version_string"""
1203        return self.compare_version(version_string, ">=")
1204
1205    def compare_version(self, version_string, op):
1206        """Compare Abinit version to `version_string` with operator `op`"""
1207        from pkg_resources import parse_version
1208        from monty.operator import operator_from_str
1209        op = operator_from_str(op)
1210        return op(parse_version(self.version), parse_version(version_string))
1211
1212
1213class FakeProcess(object):
1214    """
1215    This object is attached to a |Task| instance if the task has not been submitted
1216    This trick allows us to simulate a process that is still running so that
1217    we can safely poll task.process.
1218    """
1219    def poll(self):
1220        return None
1221
1222    def wait(self):
1223        raise RuntimeError("Cannot wait a FakeProcess")
1224
1225    def communicate(self, input=None):
1226        raise RuntimeError("Cannot communicate with a FakeProcess")
1227
1228    def kill(self):
1229        raise RuntimeError("Cannot kill a FakeProcess")
1230
1231    @property
1232    def returncode(self):
1233        return None
1234
1235
1236class MyTimedelta(datetime.timedelta):
1237    """A customized version of timedelta whose __str__ method doesn't print microseconds."""
1238    def __new__(cls, days, seconds, microseconds):
1239        return datetime.timedelta.__new__(cls, days, seconds, microseconds)
1240
1241    def __str__(self):
1242        """Remove microseconds from timedelta default __str__"""
1243        s = super().__str__()
1244        microsec = s.find(".")
1245        if microsec != -1: s = s[:microsec]
1246        return s
1247
1248    @classmethod
1249    def as_timedelta(cls, delta):
1250        """Convert delta into a MyTimedelta object."""
1251        # Cannot monkey patch the __class__ and must pass through __new__ as the object is immutable.
1252        if isinstance(delta, cls): return delta
1253        return cls(delta.days, delta.seconds, delta.microseconds)
1254
1255
1256class TaskDateTimes(object):
1257    """
1258    Small object containing useful :class:`datetime.datatime` objects associated to important events.
1259
1260    .. attributes:
1261
1262        init: initialization datetime
1263        submission: submission datetime
1264        start: Begin of execution.
1265        end: End of execution.
1266    """
1267    def __init__(self):
1268        self.init = datetime.datetime.now()
1269        self.submission, self.start, self.end = None, None, None
1270
1271    def __str__(self):
1272        lines = []
1273        app = lines.append
1274
1275        app("Initialization done on: %s" % self.init)
1276        if self.submission is not None: app("Submitted on: %s" % self.submission)
1277        if self.start is not None: app("Started on: %s" % self.start)
1278        if self.end is not None: app("Completed on: %s" % self.end)
1279
1280        return "\n".join(lines)
1281
1282    def reset(self):
1283        """Reinitialize the counters."""
1284        self = self.__class__()
1285
1286    def get_runtime(self):
1287        """:class:`timedelta` with the run-time, None if the Task is not running"""
1288        if self.start is None: return None
1289
1290        if self.end is None:
1291            delta = datetime.datetime.now() - self.start
1292        else:
1293            delta = self.end - self.start
1294
1295        return MyTimedelta.as_timedelta(delta)
1296
1297    def get_time_inqueue(self):
1298        """
1299        :class:`timedelta` with the time spent in the Queue, None if the Task is not running
1300
1301        .. note:
1302
1303            This value is always greater than the real value computed by the resource manager
1304            as we start to count only when check_status sets the `Task` status to S_RUN.
1305        """
1306        if self.submission is None: return None
1307
1308        if self.start is None:
1309            delta = datetime.datetime.now() - self.submission
1310        else:
1311            delta = self.start - self.submission
1312            # This happens when we read the exact start datetime from the ABINIT log file.
1313            if delta.total_seconds() < 0: delta = datetime.timedelta(seconds=0)
1314
1315        return MyTimedelta.as_timedelta(delta)
1316
1317
1318class TaskError(NodeError):
1319    """Base Exception for |Task| methods"""
1320
1321
1322class TaskRestartError(TaskError):
1323    """Exception raised while trying to restart the |Task|."""
1324
1325
1326class Task(Node, metaclass=abc.ABCMeta):
1327    """
1328    A Task is a node that performs some kind of calculation.
1329    This is base class providing low-level methods.
1330    """
1331    # Use class attributes for TaskErrors so that we don't have to import them.
1332    Error = TaskError
1333    RestartError = TaskRestartError
1334
1335    # List of `AbinitEvent` subclasses that are tested in the check_status method.
1336    # Subclasses should provide their own list if they need to check the converge status.
1337    CRITICAL_EVENTS = []
1338
1339    # Prefixes for Abinit (input, output, temporary) files.
1340    Prefix = collections.namedtuple("Prefix", "idata odata tdata")
1341    pj = os.path.join
1342
1343    prefix = Prefix(pj("indata", "in"), pj("outdata", "out"), pj("tmpdata", "tmp"))
1344    del Prefix, pj
1345
1346    def __init__(self, input, workdir=None, manager=None, deps=None):
1347        """
1348        Args:
1349            input: |AbinitInput| object.
1350            workdir: Path to the working directory.
1351            manager: |TaskManager| object.
1352            deps: Dictionary specifying the dependency of this node.
1353                  None means that this Task has no dependency.
1354        """
1355        # Init the node
1356        super().__init__()
1357
1358        self._input = input
1359
1360        if workdir is not None:
1361            self.set_workdir(workdir)
1362
1363        if manager is not None:
1364            self.set_manager(manager)
1365
1366        # Handle possible dependencies.
1367        if deps:
1368            self.add_deps(deps)
1369
1370        # Date-time associated to submission, start and end.
1371        self.datetimes = TaskDateTimes()
1372
1373        # Count the number of restarts.
1374        self.num_restarts = 0
1375
1376        self._qjob = None
1377        self.queue_errors = []
1378        self.abi_errors = []
1379
1380        # two flags that provide, dynamically, information on the scaling behavious of a task. If any process of fixing
1381        # finds none scaling behaviour, they should be switched. If a task type is clearly not scaling they should be
1382        # swiched.
1383        self.mem_scales = True
1384        self.load_scales = True
1385
1386    def __getstate__(self):
1387        """
1388        Return state is pickled as the contents for the instance.
1389
1390        In this case we just remove the process since Subprocess objects cannot be pickled.
1391        This is the reason why we have to store the returncode in self._returncode instead
1392        of using self.process.returncode.
1393        """
1394        return {k: v for k, v in self.__dict__.items() if k not in ["_process"]}
1395
1396    #@check_spectator
1397    def set_workdir(self, workdir, chroot=False):
1398        """Set the working directory. Cannot be set more than once unless chroot is True"""
1399        if not chroot and hasattr(self, "workdir") and self.workdir != workdir:
1400            raise ValueError("self.workdir != workdir: %s, %s" % (self.workdir,  workdir))
1401
1402        self.workdir = os.path.abspath(workdir)
1403
1404        # Files required for the execution.
1405        self.input_file = File(os.path.join(self.workdir, "run.abi"))
1406        self.output_file = File(os.path.join(self.workdir, "run.abo"))
1407        self.files_file = File(os.path.join(self.workdir, "run.files"))
1408        self.job_file = File(os.path.join(self.workdir, "job.sh"))
1409        self.log_file = File(os.path.join(self.workdir, "run.log"))
1410        self.stderr_file = File(os.path.join(self.workdir, "run.err"))
1411        self.start_lockfile = File(os.path.join(self.workdir, "__startlock__"))
1412        # This file is produced by Abinit if nprocs > 1 and MPI_ABORT.
1413        self.mpiabort_file = File(os.path.join(self.workdir, "__ABI_MPIABORTFILE__"))
1414
1415        # Directories with input|output|temporary data.
1416        self.wdir = Directory(self.workdir)
1417        self.indir = Directory(os.path.join(self.workdir, "indata"))
1418        self.outdir = Directory(os.path.join(self.workdir, "outdata"))
1419        self.tmpdir = Directory(os.path.join(self.workdir, "tmpdata"))
1420
1421        # stderr and output file of the queue manager. Note extensions.
1422        self.qerr_file = File(os.path.join(self.workdir, "queue.qerr"))
1423        self.qout_file = File(os.path.join(self.workdir, "queue.qout"))
1424
1425    def set_manager(self, manager):
1426        """Set the |TaskManager| used to launch the Task."""
1427        self.manager = manager.deepcopy()
1428
1429    @property
1430    def work(self):
1431        """The |Work| containing this `Task`."""
1432        return self._work
1433
1434    def set_work(self, work):
1435        """Set the |Work| associated to this |Task|."""
1436        if not hasattr(self, "_work"):
1437            self._work = work
1438        else:
1439            if self._work != work:
1440                raise ValueError("self._work != work")
1441
1442    @property
1443    def flow(self):
1444        """The |Flow| containing this |Task|."""
1445        return self.work.flow
1446
1447    @lazy_property
1448    def pos(self):
1449        """The position of the task in the |Flow|"""
1450        for i, task in enumerate(self.work):
1451            if self == task:
1452                return self.work.pos, i
1453        raise ValueError("Cannot find the position of %s in flow %s" % (self, self.flow))
1454
1455    @property
1456    def pos_str(self):
1457        """String representation of self.pos"""
1458        return "w" + str(self.pos[0]) + "_t" + str(self.pos[1])
1459
1460    @property
1461    def num_launches(self):
1462        """
1463        Number of launches performed. This number includes both possible ABINIT restarts
1464        as well as possible launches done due to errors encountered with the resource manager
1465        or the hardware/software."""
1466        return sum(q.num_launches for q in self.manager.qads)
1467
1468    @property
1469    def input(self):
1470        """AbinitInput object."""
1471        return self._input
1472
1473    def get_inpvar(self, varname, default=None):
1474        """Return the value of the ABINIT variable varname, None if not present."""
1475        return self.input.get(varname, default)
1476
1477    def set_vars(self, *args, **kwargs):
1478        """
1479        Set the values of the ABINIT variables in the input file. Return dict with old values.
1480        """
1481        kwargs.update(dict(*args))
1482        old_values = {vname: self.input.get(vname) for vname in kwargs}
1483        self.input.set_vars(**kwargs)
1484        if kwargs or old_values:
1485            self.history.info("Setting input variables: %s" % str(kwargs))
1486            self.history.info("Old values: %s" % str(old_values))
1487
1488        return old_values
1489
1490    @property
1491    def initial_structure(self):
1492        """Initial structure of the task."""
1493        return self.input.structure
1494
1495    def make_input(self, with_header=False):
1496        """Construct the input file of the calculation."""
1497        s = str(self.input)
1498        if with_header: s = str(self) + "\n" + s
1499        return s
1500
1501    def ipath_from_ext(self, ext):
1502        """
1503        Returns the path of the input file with extension ext.
1504        Use it when the file does not exist yet.
1505        """
1506        return os.path.join(self.workdir, self.prefix.idata + "_" + ext)
1507
1508    def opath_from_ext(self, ext):
1509        """
1510        Returns the path of the output file with extension ext.
1511        Use it when the file does not exist yet.
1512        """
1513        return os.path.join(self.workdir, self.prefix.odata + "_" + ext)
1514
1515    @property
1516    @abc.abstractmethod
1517    def executable(self):
1518        """
1519        Path to the executable associated to the task (internally stored in self._executable).
1520        """
1521
1522    def set_executable(self, executable):
1523        """Set the executable associate to this task."""
1524        self._executable = executable
1525
1526    @property
1527    def process(self):
1528        try:
1529            return self._process
1530        except AttributeError:
1531            # Attach a fake process so that we can poll it.
1532            return FakeProcess()
1533
1534    @property
1535    def is_abinit_task(self):
1536        """True if this task is a subclass of AbinitTask."""
1537        return isinstance(self, AbinitTask)
1538
1539    @property
1540    def is_anaddb_task(self):
1541        """True if this task is a subclass of OpticTask."""
1542        return isinstance(self, AnaddbTask)
1543
1544    @property
1545    def is_optic_task(self):
1546        """True if this task is a subclass of OpticTask."""
1547        return isinstance(self, OpticTask)
1548
1549    @property
1550    def is_completed(self):
1551        """True if the task has been executed."""
1552        return self.status >= self.S_DONE
1553
1554    @property
1555    def can_run(self):
1556        """The task can run if its status is < S_SUB and all the other dependencies (if any) are done!"""
1557        all_ok = all(stat == self.S_OK for stat in self.deps_status)
1558        return self.status < self.S_SUB and self.status != self.S_LOCKED and all_ok
1559
1560    #@check_spectator
1561    def cancel(self):
1562        """Cancel the job. Returns 1 if job was cancelled."""
1563        if self.queue_id is None: return 0
1564        if self.status >= self.S_DONE: return 0
1565
1566        exit_status = self.manager.cancel(self.queue_id)
1567        if exit_status != 0:
1568            self.history.warning("manager.cancel returned exit_status: %s" % exit_status)
1569            return 0
1570
1571        # Remove output files and reset the status.
1572        self.history.info("Job %s cancelled by user" % self.queue_id)
1573        self.reset()
1574        return 1
1575
1576    def with_fixed_mpi_omp(self, mpi_procs, omp_threads):
1577        """
1578        Disable autoparal and force execution with `mpi_procs` MPI processes
1579        and `omp_threads` OpenMP threads. Useful for generating benchmarks.
1580        """
1581        manager = self.manager if hasattr(self, "manager") else self.flow.manager
1582        self.manager = manager.new_with_fixed_mpi_omp(mpi_procs, omp_threads)
1583
1584    #def set_max_ncores(self, max_ncores):
1585    #    """
1586    #    """
1587    #    manager = self.manager if hasattr(self, "manager") else self.flow.manager
1588    #    self.manager = manager.new_with_max_ncores(mpi_procs, omp_threads)
1589
1590    #@check_spectator
1591    def _on_done(self):
1592        self.fix_ofiles()
1593
1594    #@check_spectator
1595    def _on_ok(self):
1596        # Fix output file names.
1597        self.fix_ofiles()
1598
1599        # Get results
1600        results = self.on_ok()
1601
1602        self.finalized = True
1603
1604        return results
1605
1606    #@check_spectator
1607    def on_ok(self):
1608        """
1609        This method is called once the `Task` has reached status S_OK.
1610        Subclasses should provide their own implementation
1611
1612        Returns:
1613            Dictionary that must contain at least the following entries:
1614                returncode:
1615                    0 on success.
1616                message:
1617                    a string that should provide a human-readable description of what has been performed.
1618        """
1619        return dict(returncode=0, message="Calling on_all_ok of the base class!")
1620
1621    #@check_spectator
1622    def fix_ofiles(self):
1623        """
1624        This method is called when the task reaches S_OK.
1625        It changes the extension of particular output files
1626        produced by Abinit so that the 'official' extension
1627        is preserved e.g. out_1WF14 --> out_1WF
1628        """
1629        filepaths = self.outdir.list_filepaths()
1630        #self.history.info("in fix_ofiles with filepaths %s" % list(filepaths))
1631
1632        old2new = FilepathFixer().fix_paths(filepaths)
1633
1634        for old, new in old2new.items():
1635            self.history.info("will rename old %s to new %s" % (old, new))
1636            os.rename(old, new)
1637
1638    #@check_spectator
1639    def _restart(self, submit=True):
1640        """
1641        Called by restart once we have finished preparing the task for restarting.
1642
1643        Return:
1644            True if task has been restarted
1645        """
1646        self.set_status(self.S_READY, msg="Restarted on %s" % time.asctime())
1647
1648        # Increase the counter.
1649        self.num_restarts += 1
1650        self.history.info("Restarted, num_restarts %d" % self.num_restarts)
1651
1652        # Reset datetimes
1653        self.datetimes.reset()
1654
1655        # Remove the lock file
1656        self.start_lockfile.remove()
1657
1658        if submit:
1659            # Relaunch the task.
1660            fired = self.start()
1661            if not fired: self.history.warning("Restart failed")
1662        else:
1663            fired = False
1664
1665        return fired
1666
1667    #@check_spectator
1668    def restart(self):
1669        """
1670        Restart the calculation.  Subclasses should provide a concrete version that
1671        performs all the actions needed for preparing the restart and then calls self._restart
1672        to restart the task. The default implementation is empty.
1673
1674        Returns:
1675            1 if job was restarted, 0 otherwise.
1676        """
1677        self.history.debug("Calling the **empty** restart method of the base class")
1678        return 0
1679
1680    def poll(self):
1681        """Check if child process has terminated. Set and return returncode attribute."""
1682        self._returncode = self.process.poll()
1683
1684        if self._returncode is not None:
1685            self.set_status(self.S_DONE, "status set to DONE")
1686
1687        return self._returncode
1688
1689    def wait(self):
1690        """Wait for child process to terminate. Set and return returncode attribute."""
1691        self._returncode = self.process.wait()
1692        try:
1693            self.process.stderr.close()
1694        except Exception:
1695            pass
1696        self.set_status(self.S_DONE, "status set to DONE")
1697
1698        return self._returncode
1699
1700    def communicate(self, input=None):
1701        """
1702        Interact with process: Send data to stdin. Read data from stdout and stderr, until end-of-file is reached.
1703        Wait for process to terminate. The optional input argument should be a string to be sent to the
1704        child process, or None, if no data should be sent to the child.
1705
1706        communicate() returns a tuple (stdoutdata, stderrdata).
1707        """
1708        stdoutdata, stderrdata = self.process.communicate(input=input)
1709        self._returncode = self.process.returncode
1710        self.set_status(self.S_DONE, "status set to DONE")
1711
1712        return stdoutdata, stderrdata
1713
1714    def kill(self):
1715        """Kill the child."""
1716        self.process.kill()
1717        self.set_status(self.S_ERROR, "status set to ERROR by task.kill")
1718        self._returncode = self.process.returncode
1719
1720    @property
1721    def returncode(self):
1722        """
1723        The child return code, set by poll() and wait() (and indirectly by communicate()).
1724        A None value indicates that the process hasn't terminated yet.
1725        A negative value -N indicates that the child was terminated by signal N (Unix only).
1726        """
1727        try:
1728            return self._returncode
1729        except AttributeError:
1730            return 0
1731
1732    def reset(self):
1733        """
1734        Reset the task status. Mainly used if we made a silly mistake in the initial
1735        setup of the queue manager and we want to fix it and rerun the task.
1736
1737        Returns:
1738            0 on success, 1 if reset failed.
1739        """
1740        # Can only reset tasks that are done.
1741        # One should be able to reset 'Submitted' tasks (sometimes, they are not in the queue
1742        # and we want to restart them)
1743        #if self.status != self.S_SUB and self.status < self.S_DONE: return 1
1744
1745        # Remove output files otherwise the EventParser will think the job is still running
1746        self.output_file.remove()
1747        self.log_file.remove()
1748        self.stderr_file.remove()
1749        self.start_lockfile.remove()
1750        self.qerr_file.remove()
1751        self.qout_file.remove()
1752        if self.mpiabort_file.exists:
1753            self.mpiabort_file.remove()
1754
1755        self.set_status(self.S_INIT, msg="Reset on %s" % time.asctime())
1756        self.num_restarts = 0
1757        self.set_qjob(None)
1758
1759        # Reset finalized flags.
1760        self.work.finalized = False
1761        self.flow.finalized = False
1762
1763        return 0
1764
1765    @property
1766    @return_none_if_raise(AttributeError)
1767    def queue_id(self):
1768        """Queue identifier returned by the Queue manager. None if not set"""
1769        return self.qjob.qid
1770
1771    @property
1772    @return_none_if_raise(AttributeError)
1773    def qname(self):
1774        """Queue name identifier returned by the Queue manager. None if not set"""
1775        return self.qjob.qname
1776
1777    @property
1778    def qjob(self):
1779        return self._qjob
1780
1781    def set_qjob(self, qjob):
1782        """Set info on queue after submission."""
1783        self._qjob = qjob
1784
1785    @property
1786    def has_queue(self):
1787        """True if we are submitting jobs via a queue manager."""
1788        return self.manager.qadapter.QTYPE.lower() != "shell"
1789
1790    @property
1791    def num_cores(self):
1792        """Total number of CPUs used to run the task."""
1793        return self.manager.num_cores
1794
1795    @property
1796    def mpi_procs(self):
1797        """Number of CPUs used for MPI."""
1798        return self.manager.mpi_procs
1799
1800    @property
1801    def omp_threads(self):
1802        """Number of CPUs used for OpenMP."""
1803        return self.manager.omp_threads
1804
1805    @property
1806    def mem_per_proc(self):
1807        """Memory per MPI process."""
1808        return Memory(self.manager.mem_per_proc, "Mb")
1809
1810    @property
1811    def status(self):
1812        """Gives the status of the task."""
1813        return self._status
1814
1815    def lock(self, source_node):
1816        """Lock the task, source is the |Node| that applies the lock."""
1817        if self.status != self.S_INIT:
1818            raise ValueError("Trying to lock a task with status %s" % self.status)
1819
1820        self._status = self.S_LOCKED
1821        self.history.info("Locked by node %s", source_node)
1822
1823    def unlock(self, source_node, check_status=True):
1824        """
1825        Unlock the task, set its status to `S_READY` so that the scheduler can submit it.
1826        source_node is the |Node| that removed the lock
1827        Call task.check_status if check_status is True.
1828        """
1829        if self.status != self.S_LOCKED:
1830            raise RuntimeError("Trying to unlock a task with status %s" % self.status)
1831
1832        self._status = self.S_READY
1833        if check_status: self.check_status()
1834        self.history.info("Unlocked by %s", source_node)
1835
1836    #@check_spectator
1837    def set_status(self, status, msg):
1838        """
1839        Set and return the status of the task.
1840
1841        Args:
1842            status: Status object or string representation of the status
1843            msg: string with human-readable message used in the case of errors.
1844        """
1845        # truncate string if it's long. msg will be logged in the object and we don't want to waste memory.
1846        if len(msg) > 2000:
1847            msg = msg[:2000]
1848            msg += "\n... snip ...\n"
1849
1850        # Locked files must be explicitly unlocked
1851        if self.status == self.S_LOCKED or status == self.S_LOCKED:
1852            err_msg = (
1853                 "Locked files must be explicitly unlocked before calling set_status but\n"
1854                 "task.status = %s, input status = %s" % (self.status, status))
1855            raise RuntimeError(err_msg)
1856
1857        status = Status.as_status(status)
1858
1859        changed = True
1860        if hasattr(self, "_status"):
1861            changed = (status != self._status)
1862
1863        self._status = status
1864
1865        if status == self.S_RUN:
1866            # Set datetimes.start when the task enters S_RUN
1867            if self.datetimes.start is None:
1868                self.datetimes.start = datetime.datetime.now()
1869
1870        # Add new entry to history only if the status has changed.
1871        if changed:
1872            if status == self.S_SUB:
1873                self.datetimes.submission = datetime.datetime.now()
1874                self.history.info("Submitted with MPI=%s, Omp=%s, Memproc=%.1f [Gb] %s " % (
1875                    self.mpi_procs, self.omp_threads, self.mem_per_proc.to("Gb"), msg))
1876
1877            elif status == self.S_OK:
1878                self.history.info("Task completed %s", msg)
1879
1880            elif status == self.S_ABICRITICAL:
1881                self.history.info("Status set to S_ABI_CRITICAL due to: %s", msg)
1882
1883            else:
1884                self.history.info("Status changed to %s. msg: %s", status, msg)
1885
1886        #######################################################
1887        # The section belows contains callbacks that should not
1888        # be executed if we are in spectator_mode
1889        #######################################################
1890        if status == self.S_DONE:
1891            # Execute the callback
1892            self._on_done()
1893
1894        if status == self.S_OK:
1895            # Finalize the task.
1896            if not self.finalized:
1897                self._on_ok()
1898
1899                # here we remove the output files of the task and of its parents.
1900                if self.gc is not None and self.gc.policy == "task":
1901                    self.clean_output_files()
1902
1903            if self.status == self.S_OK:
1904                # Because _on_ok might have changed the status.
1905                self.send_signal(self.S_OK)
1906
1907        return status
1908
1909    def check_status(self):
1910        """
1911        This function checks the status of the task by inspecting the output and the
1912        error files produced by the application and by the queue manager.
1913        """
1914        # 1) see it the job is blocked
1915        # 2) see if an error occured at submitting the job the job was submitted, TODO these problems can be solved
1916        # 3) see if there is output
1917        # 4) see if abinit reports problems
1918        # 5) see if both err files exist and are empty
1919        # 6) no output and no err files, the job must still be running
1920        # 7) try to find out what caused the problems
1921        # 8) there is a problem but we did not figure out what ...
1922        # 9) the only way of landing here is if there is a output file but no err files...
1923
1924        # 1) A locked task can only be unlocked by calling set_status explicitly.
1925        # an errored task, should not end up here but just to be sure
1926        black_list = (self.S_LOCKED, self.S_ERROR)
1927        #if self.status in black_list: return self.status
1928
1929        # 2) Check the returncode of the job script
1930        if self.returncode != 0:
1931            msg = "job.sh return code: %s\nPerhaps the job was not submitted properly?" % self.returncode
1932            return self.set_status(self.S_QCRITICAL, msg=msg)
1933
1934        # If we have an abort file produced by Abinit
1935        if self.mpiabort_file.exists:
1936            return self.set_status(self.S_ABICRITICAL, msg="Found ABINIT abort file")
1937
1938        # Analyze the stderr file for Fortran runtime errors.
1939        # getsize is 0 if the file is empty or it does not exist.
1940        err_msg = None
1941        if self.stderr_file.getsize() != 0:
1942            err_msg = self.stderr_file.read()
1943
1944        # Analyze the stderr file of the resource manager runtime errors.
1945        # TODO: Why are we looking for errors in queue.qerr?
1946        qerr_info = None
1947        if self.qerr_file.getsize() != 0:
1948            qerr_info = self.qerr_file.read()
1949
1950        # Analyze the stdout file of the resource manager (needed for PBS !)
1951        qout_info = None
1952        if self.qout_file.getsize():
1953            qout_info = self.qout_file.read()
1954
1955        # Start to check ABINIT status if the output file has been created.
1956        #if self.output_file.getsize() != 0:
1957        if self.output_file.exists:
1958            try:
1959                report = self.get_event_report()
1960            except Exception as exc:
1961                msg = "%s exception while parsing event_report:\n%s" % (self, exc)
1962                return self.set_status(self.S_ABICRITICAL, msg=msg)
1963
1964            if report is None:
1965                return self.set_status(self.S_ERROR, msg="Got None report!")
1966
1967            if report.run_completed:
1968                # Here we set the correct timing data reported by Abinit
1969                self.datetimes.start = report.start_datetime
1970                self.datetimes.end = report.end_datetime
1971
1972                # Check if the calculation converged.
1973                not_ok = report.filter_types(self.CRITICAL_EVENTS)
1974                if not_ok:
1975                    return self.set_status(self.S_UNCONVERGED, msg='status set to UNCONVERGED based on abiout')
1976                else:
1977                    return self.set_status(self.S_OK, msg="status set to OK based on abiout")
1978
1979            # Calculation still running or errors?
1980            if report.errors:
1981                # Abinit reported problems
1982                self.history.debug('Found errors in report')
1983                for error in report.errors:
1984                    self.history.debug(str(error))
1985                    try:
1986                        self.abi_errors.append(error)
1987                    except AttributeError:
1988                        self.abi_errors = [error]
1989
1990                # The job is unfixable due to ABINIT errors
1991                self.history.debug("%s: Found Errors or Bugs in ABINIT main output!" % self)
1992                msg = "\n".join(map(repr, report.errors))
1993                return self.set_status(self.S_ABICRITICAL, msg=msg)
1994
1995            # 5)
1996            if self.stderr_file.exists and not err_msg:
1997                if self.qerr_file.exists and not qerr_info:
1998                    # there is output and no errors
1999                    # The job still seems to be running
2000                    return self.set_status(self.S_RUN, msg='there is output and no errors: job still seems to be running')
2001
2002        # 6)
2003        if not self.output_file.exists:
2004            #self.history.debug("output_file does not exists")
2005            if not self.stderr_file.exists and not self.qerr_file.exists:
2006                # No output at allThe job is still in the queue.
2007                return self.status
2008
2009        # 7) Analyze the files of the resource manager and abinit and execution err (mvs)
2010        # MG: This section has been disabled: several portability issues
2011        # Need more robust logic in error_parser, perhaps logic provided by users via callbacks.
2012        """
2013        if False and (qerr_info or qout_info):
2014            from abipy.flowtk.scheduler_error_parsers import get_parser
2015            scheduler_parser = get_parser(self.manager.qadapter.QTYPE, err_file=self.qerr_file.path,
2016                                          out_file=self.qout_file.path, run_err_file=self.stderr_file.path)
2017
2018            if scheduler_parser is None:
2019                return self.set_status(self.S_QCRITICAL,
2020                                       msg="Cannot find scheduler_parser for qtype %s" % self.manager.qadapter.QTYPE)
2021
2022            scheduler_parser.parse()
2023
2024            if scheduler_parser.errors:
2025                # Store the queue errors in the task
2026                self.queue_errors = scheduler_parser.errors
2027                # The job is killed or crashed and we know what happened
2028                msg = "scheduler errors found:\n%s" % str(scheduler_parser.errors)
2029                return self.set_status(self.S_QCRITICAL, msg=msg)
2030
2031            elif lennone(qerr_info) > 0:
2032                # if only qout_info, we are not necessarily in QCRITICAL state,
2033                # since there will always be info in the qout file
2034                self.history.info('Found unknown message in the queue qerr file: %s' % str(qerr_info))
2035                #try:
2036                #    rt = self.datetimes.get_runtime().seconds
2037                #except Exception:
2038                #    rt = -1.0
2039                #tl = self.manager.qadapter.timelimit
2040                #if rt > tl:
2041                #    msg += 'set to error : runtime (%s) exceded walltime (%s)' % (rt, tl)
2042                #    print(msg)
2043                #    return self.set_status(self.S_ERROR, msg=msg)
2044                # The job may be killed or crashed but we don't know what happened
2045                # It may also be that an innocent message was written to qerr, so we wait for a while
2046                # it is set to QCritical, we will attempt to fix it by running on more resources
2047        """
2048
2049        # 8) analyzing the err files and abinit output did not identify a problem
2050        # but if the files are not empty we do have a problem but no way of solving it:
2051        # The job is killed or crashed but we don't know what happend
2052        # it is set to QCritical, we will attempt to fix it by running on more resources
2053        if err_msg:
2054            msg = 'Found error message:\n %s' % str(err_msg)
2055            self.history.warning(msg)
2056            #return self.set_status(self.S_QCRITICAL, msg=msg)
2057
2058        # 9) if we still haven't returned there is no indication of any error and the job can only still be running
2059        # but we should actually never land here, or we have delays in the file system ....
2060        # print('the job still seems to be running maybe it is hanging without producing output... ')
2061
2062        # Check time of last modification.
2063        if self.output_file.exists and \
2064           (time.time() - self.output_file.get_stat().st_mtime > self.manager.policy.frozen_timeout):
2065            msg = "Task seems to be frozen, last change more than %s [s] ago" % self.manager.policy.frozen_timeout
2066            return self.set_status(self.S_ERROR, msg=msg)
2067
2068        # Handle weird case in which either run.abo, or run.log have not been produced
2069        #if self.status not in (self.S_INIT, self.S_READY) and (not self.output.file.exists or not self.log_file.exits):
2070        #    msg = "Task have been submitted but cannot find the log file or the output file"
2071        #    return self.set_status(self.S_ERROR, msg)
2072
2073        return self.set_status(self.S_RUN, msg='final option: nothing seems to be wrong, the job must still be running')
2074
2075    def reduce_memory_demand(self):
2076        """
2077        Method that can be called by the scheduler to decrease the memory demand of a specific task.
2078        Returns True in case of success, False in case of Failure.
2079        Should be overwritten by specific tasks.
2080        """
2081        return False
2082
2083    def speed_up(self):
2084        """
2085        Method that can be called by the flow to decrease the time needed for a specific task.
2086        Returns True in case of success, False in case of Failure
2087        Should be overwritten by specific tasks.
2088        """
2089        return False
2090
2091    def out_to_in(self, out_file):
2092        """
2093        Move an output file to the output data directory of the `Task`
2094        and rename the file so that ABINIT will read it as an input data file.
2095
2096        Returns:
2097            The absolute path of the new file in the indata directory.
2098        """
2099        in_file = os.path.basename(out_file).replace("out", "in", 1)
2100        dest = os.path.join(self.indir.path, in_file)
2101
2102        if os.path.exists(dest) and not os.path.islink(dest):
2103            self.history.warning("Will overwrite %s with %s" % (dest, out_file))
2104
2105        os.rename(out_file, dest)
2106        return dest
2107
2108    def inlink_file(self, filepath):
2109        """
2110        Create a symbolic link to the specified file in the
2111        directory containing the input files of the task.
2112        """
2113        if not os.path.exists(filepath):
2114            self.history.debug("Creating symbolic link to not existent file %s" % filepath)
2115
2116        # Extract the Abinit extension and add the prefix for input files.
2117        root, abiext = abi_splitext(filepath)
2118
2119        infile = "in_" + abiext
2120        infile = self.indir.path_in(infile)
2121
2122        # Link path to dest if dest link does not exist.
2123        # else check that it points to the expected file.
2124        self.history.info("Linking path %s --> %s" % (filepath, infile))
2125
2126        if not os.path.exists(infile):
2127            os.symlink(filepath, infile)
2128        else:
2129            if os.path.realpath(infile) != filepath:
2130                raise self.Error("infile %s does not point to filepath %s" % (infile, filepath))
2131
2132    def make_links(self):
2133        """
2134        Create symbolic links to the output files produced by the other tasks.
2135
2136        .. warning::
2137
2138            This method should be called only when the calculation is READY because
2139            it uses a heuristic approach to find the file to link.
2140        """
2141        for dep in self.deps:
2142            filepaths, exts = dep.get_filepaths_and_exts()
2143
2144            for path, ext in zip(filepaths, exts):
2145                self.history.info("Need path %s with ext %s" % (path, ext))
2146                dest = self.ipath_from_ext(ext)
2147
2148                if not os.path.exists(path):
2149                    # Try netcdf file.
2150                    # TODO: this case should be treated in a cleaner way.
2151                    path += ".nc"
2152                    if os.path.exists(path): dest += ".nc"
2153
2154                if not os.path.exists(path):
2155                    raise self.Error("\n%s: path `%s`\n is needed by this task but it does not exist" % (self, path))
2156
2157                if path.endswith(".nc") and not dest.endswith(".nc"): # NC --> NC file
2158                    dest += ".nc"
2159
2160                # Link path to dest if dest link does not exist.
2161                # else check that it points to the expected file.
2162                self.history.debug("Linking path %s --> %s" % (path, dest))
2163                if not os.path.exists(dest):
2164                    os.symlink(path, dest)
2165                else:
2166                    # check links but only if we haven't performed the restart.
2167                    # in this case, indeed we may have replaced the file pointer with the
2168                    # previous output file of the present task.
2169                    if os.path.realpath(dest) != path and self.num_restarts == 0:
2170                        raise self.Error("\nDestination:\n `%s`\ndoes not point to path:\n `%s`" % (dest, path))
2171
2172    @abc.abstractmethod
2173    def setup(self):
2174        """Public method called before submitting the task."""
2175
2176    def _setup(self):
2177        """
2178        This method calls self.setup after having performed additional operations
2179        such as the creation of the symbolic links needed to connect different tasks.
2180        """
2181        self.make_links()
2182        self.setup()
2183
2184    def get_event_report(self, source="log"):
2185        """
2186        Analyzes the main logfile of the calculation for possible Errors or Warnings.
2187        If the ABINIT abort file is found, the error found in this file are added to
2188        the output report.
2189
2190        Args:
2191            source: "output" for the main output file,"log" for the log file.
2192
2193        Returns:
2194            :class:`EventReport` instance or None if the source file file does not exist.
2195        """
2196        # By default, we inspect the main log file.
2197        ofile = {
2198            "output": self.output_file,
2199            "log": self.log_file}[source]
2200
2201        parser = events.EventsParser()
2202
2203        if not ofile.exists:
2204            if not self.mpiabort_file.exists:
2205                return None
2206            else:
2207                # ABINIT abort file without log!
2208                abort_report = parser.parse(self.mpiabort_file.path)
2209                return abort_report
2210
2211        try:
2212            report = parser.parse(ofile.path)
2213            #self._prev_reports[source] = report
2214
2215            # Add events found in the ABI_MPIABORTFILE.
2216            if self.mpiabort_file.exists:
2217                self.history.critical("Found ABI_MPIABORTFILE!!!!!")
2218                abort_report = parser.parse(self.mpiabort_file.path)
2219                if len(abort_report) != 1:
2220                    self.history.critical("Found more than one event in ABI_MPIABORTFILE")
2221
2222                # Weird case: empty abort file, let's skip the part
2223                # below and hope that the log file contains the error message.
2224                #if not len(abort_report): return report
2225
2226                # Add it to the initial report only if it differs
2227                # from the last one found in the main log file.
2228                last_abort_event = abort_report[-1]
2229                if report and last_abort_event != report[-1]:
2230                    report.append(last_abort_event)
2231                else:
2232                    report.append(last_abort_event)
2233
2234            return report
2235
2236        #except parser.Error as exc:
2237        except Exception as exc:
2238            # Return a report with an error entry with info on the exception.
2239            msg = "%s: Exception while parsing ABINIT events:\n %s" % (ofile, str(exc))
2240            self.set_status(self.S_ABICRITICAL, msg=msg)
2241            return parser.report_exception(ofile.path, exc)
2242
2243    def get_results(self, **kwargs):
2244        """
2245        Returns :class:`NodeResults` instance.
2246        Subclasses should extend this method (if needed) by adding
2247        specialized code that performs some kind of post-processing.
2248        """
2249        # Check whether the process completed.
2250        if self.returncode is None:
2251            raise self.Error("return code is None, you should call wait, communicate or poll")
2252
2253        if self.status is None or self.status < self.S_DONE:
2254            raise self.Error("Task is not completed")
2255
2256        return self.Results.from_node(self)
2257
2258    def move(self, dest, is_abspath=False):
2259        """
2260        Recursively move self.workdir to another location. This is similar to the Unix "mv" command.
2261        The destination path must not already exist. If the destination already exists
2262        but is not a directory, it may be overwritten depending on os.rename() semantics.
2263
2264        Be default, dest is located in the parent directory of self.workdir.
2265        Use is_abspath=True to specify an absolute path.
2266        """
2267        if not is_abspath:
2268            dest = os.path.join(os.path.dirname(self.workdir), dest)
2269
2270        shutil.move(self.workdir, dest)
2271
2272    def in_files(self):
2273        """Return all the input data files used."""
2274        return self.indir.list_filepaths()
2275
2276    def out_files(self):
2277        """Return all the output data files produced."""
2278        return self.outdir.list_filepaths()
2279
2280    def tmp_files(self):
2281        """Return all the input data files produced."""
2282        return self.tmpdir.list_filepaths()
2283
2284    def path_in_workdir(self, filename):
2285        """Create the absolute path of filename in the top-level working directory."""
2286        return os.path.join(self.workdir, filename)
2287
2288    def rename(self, src_basename, dest_basename, datadir="outdir"):
2289        """
2290        Rename a file located in datadir.
2291
2292        src_basename and dest_basename are the basename of the source file
2293        and of the destination file, respectively.
2294        """
2295        directory = {
2296            "indir": self.indir,
2297            "outdir": self.outdir,
2298            "tmpdir": self.tmpdir,
2299        }[datadir]
2300
2301        src = directory.path_in(src_basename)
2302        dest = directory.path_in(dest_basename)
2303
2304        os.rename(src, dest)
2305
2306    #@check_spectator
2307    def build(self, *args, **kwargs):
2308        """
2309        Creates the working directory and the input files of the |Task|.
2310        It does not overwrite files if they already exist.
2311        """
2312        # Create dirs for input, output and tmp data.
2313        self.indir.makedirs()
2314        self.outdir.makedirs()
2315        self.tmpdir.makedirs()
2316
2317        # Write files file and input file.
2318        if not self.files_file.exists:
2319            self.files_file.write(self.filesfile_string)
2320
2321        self.input_file.write(self.make_input())
2322        self.manager.write_jobfile(self)
2323
2324    #@check_spectator
2325    def rmtree(self, exclude_wildcard=""):
2326        """
2327        Remove all files and directories in the working directory
2328
2329        Args:
2330            exclude_wildcard: Optional string with regular expressions separated by |.
2331                Files matching one of the regular expressions will be preserved.
2332                example: exclude_wildcard="*.nc|*.txt" preserves all the files whose extension is in ["nc", "txt"].
2333        """
2334        if not exclude_wildcard:
2335            shutil.rmtree(self.workdir)
2336
2337        else:
2338            w = WildCard(exclude_wildcard)
2339
2340            for dirpath, dirnames, filenames in os.walk(self.workdir):
2341                for fname in filenames:
2342                    filepath = os.path.join(dirpath, fname)
2343                    if not w.match(fname):
2344                        os.remove(filepath)
2345
2346    def remove_files(self, *filenames):
2347        """Remove all the files listed in filenames."""
2348        filenames = list_strings(filenames)
2349
2350        for dirpath, dirnames, fnames in os.walk(self.workdir):
2351            for fname in fnames:
2352                if fname in filenames:
2353                    filepath = os.path.join(dirpath, fname)
2354                    os.remove(filepath)
2355
2356    def clean_output_files(self, follow_parents=True):
2357        """
2358        This method is called when the task reaches S_OK. It removes all the output files
2359        produced by the task that are not needed by its children as well as the output files
2360        produced by its parents if no other node needs them.
2361
2362        Args:
2363            follow_parents: If true, the output files of the parents nodes will be removed if possible.
2364
2365        Return:
2366            list with the absolute paths of the files that have been removed.
2367        """
2368        paths = []
2369        if self.status != self.S_OK:
2370            self.history.warning("Calling task.clean_output_files on a task whose status != S_OK")
2371
2372        # Remove all files in tmpdir.
2373        self.tmpdir.clean()
2374
2375        # Find the file extensions that should be preserved since these files are still
2376        # needed by the children who haven't reached S_OK
2377        except_exts = set()
2378        for child in self.get_children():
2379            if child.status == self.S_OK: continue
2380            # Find the position of self in child.deps and add the extensions.
2381            i = [dep.node for dep in child.deps].index(self)
2382            except_exts.update(child.deps[i].exts)
2383
2384        # Remove the files in the outdir of the task but keep except_exts.
2385        exts = self.gc.exts.difference(except_exts)
2386        #print("Will remove its extensions: ", exts)
2387        paths += self.outdir.remove_exts(exts)
2388        if not follow_parents: return paths
2389
2390        # Remove the files in the outdir of my parents if all the possible dependencies have been fulfilled.
2391        for parent in self.get_parents():
2392
2393            # Here we build a dictionary file extension --> list of child nodes requiring this file from parent
2394            # e.g {"WFK": [node1, node2]}
2395            ext2nodes = collections.defaultdict(list)
2396            for child in parent.get_children():
2397                if child.status == child.S_OK: continue
2398                i = [d.node for d in child.deps].index(parent)
2399                for ext in child.deps[i].exts:
2400                    ext2nodes[ext].append(child)
2401
2402            # Remove extension only if no node depends on it!
2403            except_exts = [k for k, lst in ext2nodes.items() if lst]
2404            exts = self.gc.exts.difference(except_exts)
2405            #print("%s removes extensions %s from parent node %s" % (self, exts, parent))
2406            paths += parent.outdir.remove_exts(exts)
2407
2408        self.history.info("Removed files: %s" % paths)
2409        return paths
2410
2411    def setup(self):  # noqa: E731,F811
2412        """Base class does not provide any hook."""
2413
2414    #@check_spectator
2415    def start(self, **kwargs):
2416        """
2417        Starts the calculation by performing the following steps:
2418
2419            - build dirs and files
2420            - call the _setup method
2421            - execute the job file by executing/submitting the job script.
2422
2423        Main entry point for the `Launcher`.
2424
2425        ==============  ==============================================================
2426        kwargs          Meaning
2427        ==============  ==============================================================
2428        autoparal       False to skip the autoparal step (default True)
2429        exec_args       List of arguments passed to executable.
2430        ==============  ==============================================================
2431
2432        Returns:
2433            1 if task was started, 0 otherwise.
2434        """
2435        if self.status >= self.S_SUB:
2436            raise self.Error("Task status: %s" % str(self.status))
2437
2438        if self.start_lockfile.exists:
2439            self.history.warning("Found lock file: %s" % self.start_lockfile.path)
2440            return 0
2441
2442        self.start_lockfile.write("Started on %s" % time.asctime())
2443
2444        self.build()
2445        self._setup()
2446
2447        # Add the variables needed to connect the node.
2448        for d in self.deps:
2449            cvars = d.connecting_vars()
2450            self.history.info("Adding connecting vars %s" % cvars)
2451            self.set_vars(cvars)
2452
2453            # Get (python) data from other nodes
2454            d.apply_getters(self)
2455
2456        # Automatic parallelization
2457        if kwargs.pop("autoparal", True) and hasattr(self, "autoparal_run"):
2458            try:
2459                self.autoparal_run()
2460            #except QueueAdapterError as exc:
2461            #    # If autoparal cannot find a qadapter to run the calculation raises an Exception
2462            #    self.history.critical(exc)
2463            #    msg = "Error while trying to run autoparal in task:%s\n%s" % (repr(task), straceback())
2464            #    cprint(msg, "yellow")
2465            #    self.set_status(self.S_QCRITICAL, msg=msg)
2466            #    return 0
2467            except Exception as exc:
2468                # Sometimes autoparal_run fails because Abinit aborts
2469                # at the level of the parser e.g. cannot find the spacegroup
2470                # due to some numerical noise in the structure.
2471                # In this case we call fix_abicritical and then we try to run autoparal again.
2472                self.history.critical("First call to autoparal failed with `%s`. Will try fix_abicritical" % exc)
2473                msg = "autoparal_fake_run raised:\n%s" % straceback()
2474                self.history.critical(msg)
2475
2476                fixed = self.fix_abicritical()
2477                if not fixed:
2478                    self.set_status(self.S_ABICRITICAL, msg="fix_abicritical could not solve the problem")
2479                    return 0
2480
2481                try:
2482                    self.autoparal_run()
2483                    self.history.info("Second call to autoparal succeeded!")
2484                    #cprint("Second call to autoparal succeeded!", "green")
2485
2486                except Exception as exc:
2487                    self.history.critical("Second call to autoparal failed with %s. Cannot recover!", exc)
2488                    msg = "Tried autoparal again but got:\n%s" % straceback()
2489                    cprint(msg, "red")
2490                    self.set_status(self.S_ABICRITICAL, msg=msg)
2491                    return 0
2492
2493        # Start the calculation in a subprocess and return.
2494        self._process = self.manager.launch(self, **kwargs)
2495        return 1
2496
2497    def start_and_wait(self, *args, **kwargs):
2498        """
2499        Helper method to start the task and wait for completion.
2500
2501        Mainly used when we are submitting the task via the shell without passing through a queue manager.
2502        """
2503        self.start(*args, **kwargs)
2504        retcode = self.wait()
2505        return retcode
2506
2507    def get_graphviz(self, engine="automatic", graph_attr=None, node_attr=None, edge_attr=None):
2508        """
2509        Generate task graph in the DOT language (only parents and children of this task).
2510
2511        Args:
2512            engine: ['dot', 'neato', 'twopi', 'circo', 'fdp', 'sfdp', 'patchwork', 'osage']
2513            graph_attr: Mapping of (attribute, value) pairs for the graph.
2514            node_attr: Mapping of (attribute, value) pairs set for all nodes.
2515            edge_attr: Mapping of (attribute, value) pairs set for all edges.
2516
2517        Returns: graphviz.Digraph <https://graphviz.readthedocs.io/en/stable/api.html#digraph>
2518        """
2519        # https://www.graphviz.org/doc/info/
2520        from graphviz import Digraph
2521        fg = Digraph("task", # filename="task_%s.gv" % os.path.basename(self.workdir),
2522            engine="dot" if engine == "automatic" else engine)
2523
2524        # Set graph attributes.
2525        #fg.attr(label="%s@%s" % (self.__class__.__name__, self.relworkdir))
2526        fg.attr(label=repr(self))
2527        #fg.attr(fontcolor="white", bgcolor='purple:pink')
2528        #fg.attr(rankdir="LR", pagedir="BL")
2529        #fg.attr(constraint="false", pack="true", packMode="clust")
2530        fg.node_attr.update(color='lightblue2', style='filled')
2531
2532        # Add input attributes.
2533        if graph_attr is not None:
2534            fg.graph_attr.update(**graph_attr)
2535        if node_attr is not None:
2536            fg.node_attr.update(**node_attr)
2537        if edge_attr is not None:
2538            fg.edge_attr.update(**edge_attr)
2539
2540        def node_kwargs(node):
2541            return dict(
2542                #shape="circle",
2543                color=node.color_hex,
2544                label=(str(node) if not hasattr(node, "pos_str") else
2545                    node.pos_str + "\n" + node.__class__.__name__),
2546            )
2547
2548        edge_kwargs = dict(arrowType="vee", style="solid")
2549        cluster_kwargs = dict(rankdir="LR", pagedir="BL", style="rounded", bgcolor="azure2")
2550
2551        # Build cluster with tasks.
2552        cluster_name = "cluster%s" % self.work.name
2553        with fg.subgraph(name=cluster_name) as wg:
2554            wg.attr(**cluster_kwargs)
2555            wg.attr(label="%s (%s)" % (self.__class__.__name__, self.name))
2556            wg.node(self.name, **node_kwargs(self))
2557
2558            # Connect task to children.
2559            for child in self.get_children():
2560                # Test if child is in the same work.
2561                myg = wg if child in self.work else fg
2562                myg.node(child.name, **node_kwargs(child))
2563                # Find file extensions required by this task
2564                i = [dep.node for dep in child.deps].index(self)
2565                edge_label = "+".join(child.deps[i].exts)
2566                myg.edge(self.name, child.name, label=edge_label, color=self.color_hex,
2567                         **edge_kwargs)
2568
2569            # Connect task to parents
2570            for parent in self.get_parents():
2571                # Test if parent is in the same work.
2572                myg = wg if parent in self.work else fg
2573                myg.node(parent.name, **node_kwargs(parent))
2574                # Find file extensions required by self (task)
2575                i = [dep.node for dep in self.deps].index(parent)
2576                edge_label = "+".join(self.deps[i].exts)
2577                myg.edge(parent.name, self.name, label=edge_label, color=parent.color_hex,
2578                         **edge_kwargs)
2579
2580        # Treat the case in which we have a work producing output for other tasks.
2581        #for work in self:
2582        #    children = work.get_children()
2583        #    if not children: continue
2584        #    cluster_name = "cluster%s" % work.name
2585        #    seen = set()
2586        #    for child in children:
2587        #        # This is not needed, too much confusing
2588        #        #fg.edge(cluster_name, child.name, color=work.color_hex, **edge_kwargs)
2589        #        # Find file extensions required by work
2590        #        i = [dep.node for dep in child.deps].index(work)
2591        #        for ext in child.deps[i].exts:
2592        #            out = "%s (%s)" % (ext, work.name)
2593        #            fg.node(out)
2594        #            fg.edge(out, child.name, **edge_kwargs)
2595        #            key = (cluster_name, out)
2596        #            if key not in seen:
2597        #                fg.edge(cluster_name, out, color=work.color_hex, **edge_kwargs)
2598        #                seen.add(key)
2599
2600        return fg
2601
2602
2603class DecreaseDemandsError(Exception):
2604    """
2605    exception to be raised by a task if the request to decrease some demand, load or memory, could not be performed
2606    """
2607
2608
2609class AbinitTask(Task):
2610    """
2611    Base class defining an ABINIT calculation
2612    """
2613    Results = TaskResults
2614
2615    @classmethod
2616    def from_input(cls, input, workdir=None, manager=None):
2617        """
2618        Create an instance of `AbinitTask` from an ABINIT input.
2619
2620        Args:
2621            ainput: |AbinitInput| object.
2622            workdir: Path to the working directory.
2623            manager: |TaskManager| object.
2624        """
2625        return cls(input, workdir=workdir, manager=manager)
2626
2627    @classmethod
2628    def temp_shell_task(cls, inp, mpi_procs=1, workdir=None, manager=None):
2629        """
2630        Build a Task with a temporary workdir. The task is executed via the shell with 1 MPI proc.
2631        Mainly used for invoking Abinit to get important parameters needed to prepare the real task.
2632
2633        Args:
2634            mpi_procs: Number of MPI processes to use.
2635        """
2636        # Build a simple manager to run the job in a shell subprocess
2637        # Allow users to specify the temporary directory via env variable.
2638        dir = os.getenv("ABIPY_TMPDIR", default=None)
2639        import tempfile
2640        workdir = tempfile.mkdtemp(dir=dir) if workdir is None else workdir
2641        if manager is None: manager = TaskManager.from_user_config()
2642
2643        # Construct the task and run it
2644        task = cls.from_input(inp, workdir=workdir, manager=manager.to_shell_manager(mpi_procs=mpi_procs))
2645        task.set_name('temp_shell_task')
2646        return task
2647
2648    def setup(self):
2649        """
2650        Abinit has the very *bad* habit of changing the file extension by appending the characters in [A,B ..., Z]
2651        to the output file, and this breaks a lot of code that relies of the use of a unique file extension.
2652        Here we fix this issue by renaming run.abo to run.abo_[number] if the output file "run.abo" already
2653        exists. A few lines of code in python, a lot of problems if you try to implement this trick in Fortran90.
2654        """
2655        def rename_file(afile):
2656            """Helper function to rename :class:`File` objects. Return string for logging purpose."""
2657            # Find the index of the last file (if any).
2658            # TODO: Maybe it's better to use run.abo --> run(1).abo
2659            fnames = [f for f in os.listdir(self.workdir) if f.startswith(afile.basename)]
2660            nums = [int(f) for f in [f.split("_")[-1] for f in fnames] if f.isdigit()]
2661            last = max(nums) if nums else 0
2662            new_path = afile.path + "_" + str(last+1)
2663
2664            os.rename(afile.path, new_path)
2665            return "Will rename %s to %s" % (afile.path, new_path)
2666
2667        logs = []
2668        if self.output_file.exists: logs.append(rename_file(self.output_file))
2669        if self.log_file.exists: logs.append(rename_file(self.log_file))
2670
2671        if logs:
2672            self.history.info("\n".join(logs))
2673
2674    @property
2675    def executable(self):
2676        """Path to the executable required for running the Task."""
2677        try:
2678            return self._executable
2679        except AttributeError:
2680            return "abinit"
2681
2682    @property
2683    def pseudos(self):
2684        """List of pseudos used in the calculation."""
2685        return self.input.pseudos
2686
2687    @property
2688    def isnc(self):
2689        """True if norm-conserving calculation."""
2690        return self.input.isnc
2691
2692    @property
2693    def ispaw(self):
2694        """True if PAW calculation"""
2695        return self.input.ispaw
2696
2697    @property
2698    def is_gs_task(self):
2699        """True if task is GsTask subclass."""
2700        return isinstance(self, GsTask)
2701
2702    @property
2703    def is_dfpt_task(self):
2704        """True if task is a DftpTask subclass."""
2705        return isinstance(self, DfptTask)
2706
2707    @lazy_property
2708    def cycle_class(self):
2709        """
2710        Return the subclass of ScfCycle associated to the task or
2711        None if no SCF algorithm if associated to the task.
2712        """
2713        if isinstance(self, RelaxTask):
2714            return abiinspect.Relaxation
2715        elif isinstance(self, GsTask):
2716            return abiinspect.GroundStateScfCycle
2717        elif self.is_dfpt_task:
2718            return abiinspect.D2DEScfCycle
2719
2720        return None
2721
2722    @property
2723    def filesfile_string(self):
2724        """String with the list of files and prefixes needed to execute ABINIT."""
2725        lines = []
2726        app = lines.append
2727        pj = os.path.join
2728
2729        app(self.input_file.path)                 # Path to the input file
2730        app(self.output_file.path)                # Path to the output file
2731        app(pj(self.workdir, self.prefix.idata))  # Prefix for input data
2732        app(pj(self.workdir, self.prefix.odata))  # Prefix for output data
2733        app(pj(self.workdir, self.prefix.tdata))  # Prefix for temporary data
2734
2735        # Paths to the pseudopotential files.
2736        # Note that here the pseudos **must** be sorted according to znucl.
2737        # Here we reorder the pseudos if the order is wrong.
2738        ord_pseudos = []
2739
2740        znucl = [specie.number for specie in self.input.structure.species_by_znucl]
2741
2742        for z in znucl:
2743            for p in self.pseudos:
2744                if p.Z == z:
2745                    ord_pseudos.append(p)
2746                    break
2747            else:
2748                raise ValueError("Cannot find pseudo with znucl %s in pseudos:\n%s" % (z, self.pseudos))
2749
2750        for pseudo in ord_pseudos:
2751            app(pseudo.path)
2752
2753        return "\n".join(lines)
2754
2755    def set_pconfs(self, pconfs):
2756        """Set the list of autoparal configurations."""
2757        self._pconfs = pconfs
2758
2759    @property
2760    def pconfs(self):
2761        """List of autoparal configurations."""
2762        try:
2763            return self._pconfs
2764        except AttributeError:
2765            return None
2766
2767    def uses_paral_kgb(self, value=1):
2768        """True if the task is a GS Task and uses paral_kgb with the given value."""
2769        paral_kgb = self.get_inpvar("paral_kgb", 0)
2770        # paral_kgb is used only in the GS part.
2771        return paral_kgb == value and isinstance(self, GsTask)
2772
2773    def _change_structure(self, new_structure):
2774        """Change the input structure."""
2775        # Compare new and old structure for logging purpose.
2776        # TODO: Write method of structure to compare self and other and return a dictionary
2777        old_structure = self.input.structure
2778        old_lattice = old_structure.lattice
2779
2780        abc_diff = np.array(new_structure.lattice.abc) - np.array(old_lattice.abc)
2781        angles_diff = np.array(new_structure.lattice.angles) - np.array(old_lattice.angles)
2782        cart_diff = new_structure.cart_coords - old_structure.cart_coords
2783        displs = np.array([np.sqrt(np.dot(v, v)) for v in cart_diff])
2784
2785        recs, tol_angle, tol_length = [], 10**-2, 10**-5
2786
2787        if np.any(np.abs(angles_diff) > tol_angle):
2788            recs.append("new_agles - old_angles = %s" % angles_diff)
2789
2790        if np.any(np.abs(abc_diff) > tol_length):
2791            recs.append("new_abc - old_abc = %s" % abc_diff)
2792
2793        if np.any(np.abs(displs) > tol_length):
2794            min_pos, max_pos = displs.argmin(), displs.argmax()
2795            recs.append("Mean displ: %.2E, Max_displ: %.2E (site %d), min_displ: %.2E (site %d)" %
2796                (displs.mean(), displs[max_pos], max_pos, displs[min_pos], min_pos))
2797
2798        self.history.info("Changing structure (only significant diffs are shown):")
2799        if not recs:
2800            self.history.info("Input and output structure seems to be equal within the given tolerances")
2801        else:
2802            for rec in recs:
2803                self.history.info(rec)
2804
2805        self.input.set_structure(new_structure)
2806        #assert self.input.structure == new_structure
2807
2808    def autoparal_run(self):
2809        """
2810        Find an optimal set of parameters for the execution of the task
2811        This method can change the ABINIT input variables and/or the
2812        submission parameters e.g. the number of CPUs for MPI and OpenMp.
2813
2814        Set:
2815           self.pconfs where pconfs is a :class:`ParalHints` object with the configuration reported by
2816           autoparal and optimal is the optimal configuration selected.
2817           Returns 0 if success
2818        """
2819        policy = self.manager.policy
2820
2821        if policy.autoparal == 0: # or policy.max_ncpus in [None, 1]:
2822            self.history.info("Nothing to do in autoparal, returning (None, None)")
2823            return 0
2824
2825        if policy.autoparal != 1:
2826            raise NotImplementedError("autoparal != 1")
2827
2828        ############################################################################
2829        # Run ABINIT in sequential to get the possible configurations with max_ncpus
2830        ############################################################################
2831
2832        # Set the variables for automatic parallelization
2833        # Will get all the possible configurations up to max_ncpus
2834        # Return immediately if max_ncpus == 1
2835        max_ncpus = self.manager.max_cores
2836        if max_ncpus == 1: return 0
2837
2838        autoparal_vars = dict(autoparal=policy.autoparal, max_ncpus=max_ncpus, mem_test=0)
2839        self.set_vars(autoparal_vars)
2840
2841        # Run the job in a shell subprocess with mpi_procs = 1
2842        # we don't want to make a request to the queue manager for this simple job!
2843        # Return code is always != 0
2844        process = self.manager.to_shell_manager(mpi_procs=1).launch(self)
2845        self.history.pop()
2846        retcode = process.wait()
2847        # To avoid: ResourceWarning: unclosed file <_io.BufferedReader name=87> in py3k
2848        process.stderr.close()
2849        #process.stdout.close()
2850
2851        # Remove the variables added for the automatic parallelization
2852        self.input.remove_vars(list(autoparal_vars.keys()))
2853
2854        ##############################################################
2855        # Parse the autoparal configurations from the main output file
2856        ##############################################################
2857        parser = ParalHintsParser()
2858        try:
2859            pconfs = parser.parse(self.output_file.path)
2860        except parser.Error:
2861            # In principle Abinit should have written a complete log file
2862            # because we called .wait() but sometimes the Yaml doc is incomplete and
2863            # the parser raises. Let's wait 5 secs and then try again.
2864            time.sleep(5)
2865            try:
2866                pconfs = parser.parse(self.output_file.path)
2867            except parser.Error:
2868                self.history.critical("Error while parsing Autoparal section:\n%s" % straceback())
2869                return 2
2870
2871        if "paral_kgb" not in self.input:
2872            self.input.set_vars(paral_kgb=pconfs.info.get("paral_kgb", 0))
2873
2874        ######################################################
2875        # Select the optimal configuration according to policy
2876        ######################################################
2877        optconf = self.find_optconf(pconfs)
2878
2879        ####################################################
2880        # Change the input file and/or the submission script
2881        ####################################################
2882        self.set_vars(optconf.vars)
2883
2884        # Write autoparal configurations to JSON file.
2885        d = pconfs.as_dict()
2886        d["optimal_conf"] = optconf
2887        json_pretty_dump(d, os.path.join(self.workdir, "autoparal.json"))
2888
2889        ##############
2890        # Finalization
2891        ##############
2892        # Reset the status, remove garbage files ...
2893        self.set_status(self.S_INIT, msg='finished autoparal run')
2894
2895        # Remove the output file since Abinit likes to create new files
2896        # with extension .outA, .outB if the file already exists.
2897        os.remove(self.output_file.path)
2898        os.remove(self.log_file.path)
2899        os.remove(self.stderr_file.path)
2900
2901        return 0
2902
2903    def find_optconf(self, pconfs):
2904        """Find the optimal Parallel configuration."""
2905        # Save pconfs for future reference.
2906        self.set_pconfs(pconfs)
2907
2908        # Select the partition on which we'll be running and set MPI/OMP cores.
2909        optconf = self.manager.select_qadapter(pconfs)
2910        return optconf
2911
2912    def select_files(self, what="o"):
2913        """
2914        Helper function used to select the files of a task.
2915
2916        Args:
2917            what: string with the list of characters selecting the file type
2918                  Possible choices:
2919                  i ==> input_file,
2920                  o ==> output_file,
2921                  f ==> files_file,
2922                  j ==> job_file,
2923                  l ==> log_file,
2924                  e ==> stderr_file,
2925                  q ==> qout_file,
2926                  all ==> all files.
2927        """
2928        choices = collections.OrderedDict([
2929            ("i", self.input_file),
2930            ("o", self.output_file),
2931            ("f", self.files_file),
2932            ("j", self.job_file),
2933            ("l", self.log_file),
2934            ("e", self.stderr_file),
2935            ("q", self.qout_file),
2936        ])
2937
2938        if what == "all":
2939            return [getattr(v, "path") for v in choices.values()]
2940
2941        selected = []
2942        for c in what:
2943            try:
2944                selected.append(getattr(choices[c], "path"))
2945            except KeyError:
2946                self.history.warning("Wrong keyword %s" % c)
2947
2948        return selected
2949
2950    def restart(self):
2951        """
2952        general restart used when scheduler problems have been taken care of
2953        """
2954        return self._restart()
2955
2956    #@check_spectator
2957    def reset_from_scratch(self):
2958        """
2959        Restart from scratch, this is to be used if a job is restarted with more resources after a crash
2960
2961        Move output files produced in workdir to _reset otherwise check_status continues
2962        to see the task as crashed even if the job did not run
2963        """
2964        # Create reset directory if not already done.
2965        reset_dir = os.path.join(self.workdir, "_reset")
2966        reset_file = os.path.join(reset_dir, "_counter")
2967        if not os.path.exists(reset_dir):
2968            os.mkdir(reset_dir)
2969            num_reset = 1
2970        else:
2971            with open(reset_file, "rt") as fh:
2972                num_reset = 1 + int(fh.read())
2973
2974        # Move files to reset and append digit with reset index.
2975        def move_file(f):
2976            if not f.exists: return
2977            try:
2978                f.move(os.path.join(reset_dir, f.basename + "_" + str(num_reset)))
2979            except OSError as exc:
2980                self.history.warning("Couldn't move file {}. exc: {}".format(f, str(exc)))
2981
2982        for fname in ("output_file", "log_file", "stderr_file", "qout_file", "qerr_file"):
2983            move_file(getattr(self, fname))
2984
2985        with open(reset_file, "wt") as fh:
2986            fh.write(str(num_reset))
2987
2988        self.start_lockfile.remove()
2989
2990        # Reset datetimes
2991        self.datetimes.reset()
2992
2993        return self._restart(submit=False)
2994
2995    #@check_spectator
2996    def fix_abicritical(self):
2997        """
2998        method to fix crashes/error caused by abinit
2999
3000        Returns:
3001            1 if task has been fixed else 0.
3002        """
3003        event_handlers = self.event_handlers
3004        if not event_handlers:
3005            self.set_status(status=self.S_ERROR, msg='Empty list of event handlers. Cannot fix abi_critical errors')
3006            return 0
3007
3008        count, done = 0, len(event_handlers) * [0]
3009
3010        report = self.get_event_report()
3011        if report is None:
3012            self.set_status(status=self.S_ERROR, msg='get_event_report returned None')
3013            return 0
3014
3015        # Note we have loop over all possible events (slow, I know)
3016        # because we can have handlers for Error, Bug or Warning
3017        # (ideally only for CriticalWarnings but this is not done yet)
3018        for event in report:
3019            for i, handler in enumerate(self.event_handlers):
3020
3021                if handler.can_handle(event) and not done[i]:
3022                    self.history.info("handler %s will try to fix event %s" % (handler, event))
3023                    try:
3024                        d = handler.handle_task_event(self, event)
3025                        if d:
3026                            done[i] += 1
3027                            count += 1
3028
3029                    except Exception as exc:
3030                        self.history.critical(str(exc))
3031
3032        if count:
3033            self.reset_from_scratch()
3034            return 1
3035
3036        self.set_status(status=self.S_ERROR, msg='We encountered AbiCritical events that could not be fixed')
3037        return 0
3038
3039    #@check_spectator
3040    def fix_queue_critical(self):
3041        """
3042        This function tries to fix critical events originating from the queue submission system.
3043
3044        General strategy, first try to increase resources in order to fix the problem,
3045        if this is not possible, call a task specific method to attempt to decrease the demands.
3046
3047        Returns:
3048            1 if task has been fixed else 0.
3049        """
3050        from abipy.flowtk.scheduler_error_parsers import NodeFailureError, MemoryCancelError, TimeCancelError
3051        #assert isinstance(self.manager, TaskManager)
3052
3053        self.history.info('fixing queue critical')
3054        ret = "task.fix_queue_critical: "
3055
3056        if not self.queue_errors:
3057            # TODO
3058            # paral_kgb = 1 leads to nasty sigegv that are seen as Qcritical errors!
3059            # Try to fallback to the conjugate gradient.
3060            #if self.uses_paral_kgb(1):
3061            #    self.history.critical("QCRITICAL with PARAL_KGB==1. Will try CG!")
3062            #    self.set_vars(paral_kgb=0)
3063            #    self.reset_from_scratch()
3064            #    return
3065            # queue error but no errors detected, try to solve by increasing ncpus if the task scales
3066            # if resources are at maximum the task is definitively turned to errored
3067            if self.mem_scales or self.load_scales:
3068                try:
3069                    self.manager.increase_resources()  # acts either on the policy or on the qadapter
3070                    self.reset_from_scratch()
3071                    ret += "increased resources"
3072                    return ret
3073                except ManagerIncreaseError:
3074                    self.set_status(self.S_ERROR, msg='unknown queue error, could not increase resources any further')
3075                    raise FixQueueCriticalError
3076            else:
3077                self.set_status(self.S_ERROR, msg='unknown queue error, no options left')
3078                raise FixQueueCriticalError
3079
3080        else:
3081            print("Fix_qcritical: received %d queue_errors" % len(self.queue_errors))
3082            print("type_list: %s" % list(type(qe) for qe in self.queue_errors))
3083
3084            for error in self.queue_errors:
3085                self.history.info('fixing: %s' % str(error))
3086                ret += str(error)
3087                if isinstance(error, NodeFailureError):
3088                    # if the problematic node is known, exclude it
3089                    if error.nodes is not None:
3090                        try:
3091                            self.manager.exclude_nodes(error.nodes)
3092                            self.reset_from_scratch()
3093                            self.set_status(self.S_READY, msg='excluding nodes')
3094                        except Exception:
3095                            raise FixQueueCriticalError
3096                    else:
3097                        self.set_status(self.S_ERROR, msg='Node error but no node identified.')
3098                        raise FixQueueCriticalError
3099
3100                elif isinstance(error, MemoryCancelError):
3101                    # ask the qadapter to provide more resources, i.e. more cpu's so more total memory if the code
3102                    # scales this should fix the memeory problem
3103                    # increase both max and min ncpu of the autoparalel and rerun autoparalel
3104                    if self.mem_scales:
3105                        try:
3106                            self.manager.increase_ncpus()
3107                            self.reset_from_scratch()
3108                            self.set_status(self.S_READY, msg='increased ncps to solve memory problem')
3109                            return
3110                        except ManagerIncreaseError:
3111                            self.history.warning('increasing ncpus failed')
3112
3113                    # if the max is reached, try to increase the memory per cpu:
3114                    try:
3115                        self.manager.increase_mem()
3116                        self.reset_from_scratch()
3117                        self.set_status(self.S_READY, msg='increased mem')
3118                        return
3119                    except ManagerIncreaseError:
3120                        self.history.warning('increasing mem failed')
3121
3122                    # if this failed ask the task to provide a method to reduce the memory demand
3123                    try:
3124                        self.reduce_memory_demand()
3125                        self.reset_from_scratch()
3126                        self.set_status(self.S_READY, msg='decreased mem demand')
3127                        return
3128                    except DecreaseDemandsError:
3129                        self.history.warning('decreasing demands failed')
3130
3131                    msg = ('Memory error detected but the memory could not be increased neither could the\n'
3132                           'memory demand be decreased. Unrecoverable error.')
3133                    self.set_status(self.S_ERROR, msg)
3134                    raise FixQueueCriticalError
3135
3136                elif isinstance(error, TimeCancelError):
3137                    # ask the qadapter to provide more time
3138                    print('trying to increase time')
3139                    try:
3140                        self.manager.increase_time()
3141                        self.reset_from_scratch()
3142                        self.set_status(self.S_READY, msg='increased wall time')
3143                        return
3144                    except ManagerIncreaseError:
3145                        self.history.warning('increasing the waltime failed')
3146
3147                    # if this fails ask the qadapter to increase the number of cpus
3148                    if self.load_scales:
3149                        try:
3150                            self.manager.increase_ncpus()
3151                            self.reset_from_scratch()
3152                            self.set_status(self.S_READY, msg='increased number of cpus')
3153                            return
3154                        except ManagerIncreaseError:
3155                            self.history.warning('increase ncpus to speed up the calculation to stay in the walltime failed')
3156
3157                    # if this failed ask the task to provide a method to speed up the task
3158                    try:
3159                        self.speed_up()
3160                        self.reset_from_scratch()
3161                        self.set_status(self.S_READY, msg='task speedup')
3162                        return
3163                    except DecreaseDemandsError:
3164                        self.history.warning('decreasing demands failed')
3165
3166                    msg = ('Time cancel error detected but the time could not be increased neither could\n'
3167                           'the time demand be decreased by speedup of increasing the number of cpus.\n'
3168                           'Unrecoverable error.')
3169                    self.set_status(self.S_ERROR, msg)
3170
3171                else:
3172                    msg = 'No solution provided for error %s. Unrecoverable error.' % error.name
3173                    self.set_status(self.S_ERROR, msg)
3174
3175        return 0
3176
3177    def parse_timing(self):
3178        """
3179        Parse the timer data in the main output file of Abinit.
3180        Requires timopt /= 0 in the input file (usually timopt = -1)
3181
3182        Return: :class:`AbinitTimerParser` instance, None if error.
3183        """
3184
3185        parser = AbinitTimerParser()
3186        read_ok = parser.parse(self.output_file.path)
3187        if read_ok:
3188            return parser
3189        return None
3190
3191
3192class ProduceHist(object):
3193    """
3194    Mixin class for a |Task| producing a HIST file.
3195    Provide the method `open_hist` that reads and return a HIST file.
3196    """
3197    @property
3198    def hist_path(self):
3199        """Absolute path of the HIST file. Empty string if file is not present."""
3200        # Lazy property to avoid multiple calls to has_abiext.
3201        try:
3202            return self._hist_path
3203        except AttributeError:
3204            path = self.outdir.has_abiext("HIST")
3205            if path: self._hist_path = path
3206            return path
3207
3208    def open_hist(self):
3209        """
3210        Open the HIST file located in the in self.outdir.
3211        Returns |HistFile| object, None if file could not be found or file is not readable.
3212        """
3213        if not self.hist_path:
3214            if self.status == self.S_OK:
3215                self.history.critical("%s reached S_OK but didn't produce a HIST file in %s" % (self, self.outdir))
3216            return None
3217
3218        # Open the HIST file
3219        from abipy.dynamics.hist import HistFile
3220        try:
3221            return HistFile(self.hist_path)
3222        except Exception as exc:
3223            self.history.critical("Exception while reading HIST file at %s:\n%s" % (self.hist_path, str(exc)))
3224            return None
3225
3226
3227class GsTask(AbinitTask):
3228    """
3229    Base class for ground-state tasks. A ground state task produces a GSR file
3230    Provides the method `open_gsr` that reads and returns a GSR file.
3231    """
3232    @property
3233    def gsr_path(self):
3234        """Absolute path of the GSR file. Empty string if file is not present."""
3235        # Lazy property to avoid multiple calls to has_abiext.
3236        try:
3237            return self._gsr_path
3238        except AttributeError:
3239            path = self.outdir.has_abiext("GSR")
3240            if path: self._gsr_path = path
3241            return path
3242
3243    def open_gsr(self):
3244        """
3245        Open the GSR file located in the in self.outdir.
3246        Returns |GsrFile| object, None if file could not be found or file is not readable.
3247        """
3248        gsr_path = self.gsr_path
3249        if not gsr_path:
3250            if self.status == self.S_OK:
3251                self.history.critical("%s reached S_OK but didn't produce a GSR file in %s" % (self, self.outdir))
3252            return None
3253
3254        # Open the GSR file.
3255        from abipy.electrons.gsr import GsrFile
3256        try:
3257            return GsrFile(gsr_path)
3258        except Exception as exc:
3259            self.history.critical("Exception while reading GSR file at %s:\n%s" % (gsr_path, str(exc)))
3260            return None
3261
3262
3263class ScfTask(GsTask):
3264    """
3265    Self-consistent ground-state calculations.
3266    Provide support for in-place restart via (WFK|DEN) files
3267    """
3268    CRITICAL_EVENTS = [
3269        events.ScfConvergenceWarning,
3270    ]
3271
3272    color_rgb = np.array((255, 0, 0)) / 255
3273
3274    def restart(self):
3275        """SCF calculations can be restarted if we have either the WFK file or the DEN file."""
3276        # Prefer WFK over DEN files since we can reuse the wavefunctions.
3277        for ext in ("WFK", "DEN"):
3278            restart_file = self.outdir.has_abiext(ext)
3279            irdvars = irdvars_for_ext(ext)
3280            if restart_file: break
3281        else:
3282            raise self.RestartError("%s: Cannot find WFK or DEN file to restart from." % self)
3283
3284        # Move out --> in.
3285        self.out_to_in(restart_file)
3286
3287        # Add the appropriate variable for restarting.
3288        self.set_vars(irdvars)
3289
3290        # Now we can resubmit the job.
3291        self.history.info("Will restart from %s", restart_file)
3292        return self._restart()
3293
3294    def inspect(self, **kwargs):
3295        """
3296        Plot the SCF cycle results with matplotlib.
3297
3298        Returns: |matplotlib-Figure| or None if some error occurred.
3299        """
3300        try:
3301            scf_cycle = abiinspect.GroundStateScfCycle.from_file(self.output_file.path)
3302        except IOError:
3303            return None
3304
3305        if scf_cycle is not None:
3306            if "title" not in kwargs: kwargs["title"] = str(self)
3307            return scf_cycle.plot(**kwargs)
3308
3309        return None
3310
3311    def get_results(self, **kwargs):
3312        results = super().get_results(**kwargs)
3313
3314        # Open the GSR file and add its data to results.out
3315        with self.open_gsr() as gsr:
3316            results["out"].update(gsr.as_dict())
3317            # Add files to GridFS
3318            results.register_gridfs_files(GSR=gsr.filepath)
3319
3320        return results
3321
3322
3323class CollinearThenNonCollinearScfTask(ScfTask):
3324    """
3325    A specialized ScfTaks that performs an initial SCF run with nsppol = 2.
3326    The spin polarized WFK file is then used to start a non-collinear SCF run (nspinor == 2)
3327    initialized from the previous WFK file.
3328    """
3329    def __init__(self, input, workdir=None, manager=None, deps=None):
3330        super().__init__(input, workdir=workdir, manager=manager, deps=deps)
3331        # Enforce nspinor = 1, nsppol = 2 and prtwf = 1.
3332        self._input = self.input.deepcopy()
3333        self.input.set_spin_mode("polarized")
3334        self.input.set_vars(prtwf=1)
3335        self.collinear_done = False
3336
3337    def _on_ok(self):
3338        results = super()._on_ok()
3339        if not self.collinear_done:
3340            self.input.set_spin_mode("spinor")
3341            self.collinear_done = True
3342            self.finalized = False
3343            self.restart()
3344
3345        return results
3346
3347
3348class NscfTask(GsTask):
3349    """
3350    Non-Self-consistent GS calculation. Provide in-place restart via WFK files
3351    """
3352    CRITICAL_EVENTS = [
3353        events.NscfConvergenceWarning,
3354    ]
3355
3356    color_rgb = np.array((200, 80, 100)) / 255
3357
3358    def setup(self):
3359        """
3360        NSCF calculations should use the same FFT mesh as the one employed in the GS task
3361        (in principle, it's possible to interpolate inside Abinit but tests revealed some numerical noise
3362        Here we change the input file of the NSCF task to have the same FFT mesh.
3363        """
3364        for dep in self.deps:
3365            if "DEN" in dep.exts:
3366                parent_task = dep.node
3367                break
3368        else:
3369            raise RuntimeError("Cannot find parent node producing DEN file")
3370
3371        with parent_task.open_gsr() as gsr:
3372            if hasattr(gsr, "reader"):
3373                den_mesh = gsr.reader.read_ngfft3()
3374            else:
3375                den_mesh = None
3376                self.history.warning("Cannot read ngfft3 from file. Likely Fortran file!")
3377
3378            if self.ispaw:
3379                self.set_vars(ngfftdg=den_mesh)
3380            else:
3381                self.set_vars(ngfft=den_mesh)
3382
3383        super().setup()
3384
3385    def restart(self):
3386        """NSCF calculations can be restarted only if we have the WFK file."""
3387        ext = "WFK"
3388        restart_file = self.outdir.has_abiext(ext)
3389        if not restart_file:
3390            raise self.RestartError("%s: Cannot find the WFK file to restart from." % self)
3391
3392        # Move out --> in.
3393        self.out_to_in(restart_file)
3394
3395        # Add the appropriate variable for restarting.
3396        irdvars = irdvars_for_ext(ext)
3397        self.set_vars(irdvars)
3398
3399        # Now we can resubmit the job.
3400        self.history.info("Will restart from %s", restart_file)
3401        return self._restart()
3402
3403    def get_results(self, **kwargs):
3404        results = super().get_results(**kwargs)
3405
3406        # Read the GSR file.
3407        with self.open_gsr() as gsr:
3408            results["out"].update(gsr.as_dict())
3409            # Add files to GridFS
3410            results.register_gridfs_files(GSR=gsr.filepath)
3411
3412        return results
3413
3414
3415class RelaxTask(GsTask, ProduceHist):
3416    """
3417    Task for structural optimizations.
3418    """
3419    # TODO possible ScfConvergenceWarning?
3420    CRITICAL_EVENTS = [
3421        events.RelaxConvergenceWarning,
3422    ]
3423
3424    color_rgb = np.array((255, 61, 255)) / 255
3425
3426    def get_final_structure(self):
3427        """Read the final structure from the GSR file."""
3428        try:
3429            with self.open_gsr() as gsr:
3430                return gsr.structure
3431        except AttributeError:
3432            raise RuntimeError("Cannot find the GSR file with the final structure to restart from.")
3433
3434    def restart(self):
3435        """
3436        Restart the structural relaxation.
3437
3438        Structure relaxations can be restarted only if we have the WFK file or the DEN or the GSR file
3439        from which we can read the last structure (mandatory) and the wavefunctions (not mandatory but useful).
3440        Prefer WFK over other files since we can reuse the wavefunctions.
3441
3442        .. note::
3443
3444            The problem in the present approach is that some parameters in the input
3445            are computed from the initial structure and may not be consistent with
3446            the modification of the structure done during the structure relaxation.
3447        """
3448        restart_file = None
3449
3450        # Try to restart from the WFK file if possible.
3451        # FIXME: This part has been disabled because WFK=IO is a mess if paral_kgb == 1
3452        # This is also the reason why I wrote my own MPI-IO code for the GW part!
3453        wfk_file = self.outdir.has_abiext("WFK")
3454        if False and wfk_file:
3455            irdvars = irdvars_for_ext("WFK")
3456            restart_file = self.out_to_in(wfk_file)
3457
3458        # Fallback to DEN file. Note that here we look for out_DEN instead of out_TIM?_DEN
3459        # This happens when the previous run completed and task.on_done has been performed.
3460        # ********************************************************************************
3461        # Note that it's possible to have an undetected error if we have multiple restarts
3462        # and the last relax died badly. In this case indeed out_DEN is the file produced
3463        # by the last run that has executed on_done.
3464        # ********************************************************************************
3465        if restart_file is None:
3466            for ext in ("", ".nc"):
3467                out_den = self.outdir.path_in("out_DEN" + ext)
3468                if os.path.exists(out_den):
3469                    irdvars = irdvars_for_ext("DEN")
3470                    restart_file = self.out_to_in(out_den)
3471                    break
3472
3473        if restart_file is None:
3474            # Try to restart from the last TIM?_DEN file.
3475            # This should happen if the previous run didn't complete in clean way.
3476            # Find the last TIM?_DEN file.
3477            last_timden = self.outdir.find_last_timden_file()
3478            if last_timden is not None:
3479                if last_timden.path.endswith(".nc"):
3480                    ofile = self.outdir.path_in("out_DEN.nc")
3481                else:
3482                    ofile = self.outdir.path_in("out_DEN")
3483                os.rename(last_timden.path, ofile)
3484                restart_file = self.out_to_in(ofile)
3485                irdvars = irdvars_for_ext("DEN")
3486
3487        if restart_file is None:
3488            # Don't raise RestartError as we can still change the structure.
3489            self.history.warning("Cannot find the WFK|DEN|TIM?_DEN file to restart from.")
3490        else:
3491            # Add the appropriate variable for restarting.
3492            self.set_vars(irdvars)
3493            self.history.info("Will restart from %s", restart_file)
3494
3495        # FIXME Here we should read the HIST file but restartxf if broken!
3496        #self.set_vars({"restartxf": -1})
3497
3498        # Read the relaxed structure from the GSR file and change the input.
3499        self._change_structure(self.get_final_structure())
3500
3501        # Now we can resubmit the job.
3502        return self._restart()
3503
3504    def inspect(self, **kwargs):
3505        """
3506        Plot the evolution of the structural relaxation with matplotlib.
3507
3508        Args:
3509            what: Either "hist" or "scf". The first option (default) extracts data
3510                from the HIST file and plot the evolution of the structural
3511                parameters, forces, pressures and energies.
3512                The second option, extracts data from the main output file and
3513                plot the evolution of the SCF cycles (etotal, residuals, etc).
3514
3515        Returns: |matplotlib-Figure| or None if some error occurred.
3516        """
3517        what = kwargs.pop("what", "hist")
3518
3519        if what == "hist":
3520            # Read the hist file to get access to the structure.
3521            with self.open_hist() as hist:
3522                return hist.plot(**kwargs) if hist else None
3523
3524        elif what == "scf":
3525            # Get info on the different SCF cycles
3526            relaxation = abiinspect.Relaxation.from_file(self.output_file.path)
3527            if "title" not in kwargs: kwargs["title"] = str(self)
3528            return relaxation.plot(**kwargs) if relaxation is not None else None
3529
3530        else:
3531            raise ValueError("Wrong value for what %s" % what)
3532
3533    def get_results(self, **kwargs):
3534        results = super().get_results(**kwargs)
3535
3536        # Open the GSR file and add its data to results.out
3537        with self.open_gsr() as gsr:
3538            results["out"].update(gsr.as_dict())
3539            # Add files to GridFS
3540            results.register_gridfs_files(GSR=gsr.filepath)
3541
3542        return results
3543
3544    def reduce_dilatmx(self, target=1.01):
3545        actual_dilatmx = self.get_inpvar('dilatmx', 1.)
3546        new_dilatmx = actual_dilatmx - min((actual_dilatmx-target), actual_dilatmx*0.05)
3547        self.set_vars(dilatmx=new_dilatmx)
3548
3549    def fix_ofiles(self):
3550        """
3551        Note that ABINIT produces lots of out_TIM1_DEN files for each step.
3552        Here we list all TIM*_DEN files, we select the last one and we rename it in out_DEN
3553
3554        This change is needed so that we can specify dependencies with the syntax {node: "DEN"}
3555        without having to know the number of iterations needed to converge the run in node!
3556        """
3557        super().fix_ofiles()
3558
3559        # Find the last TIM?_DEN file.
3560        last_timden = self.outdir.find_last_timden_file()
3561        if last_timden is None:
3562            self.history.warning("Cannot find TIM?_DEN files")
3563            return
3564
3565        # Rename last TIMDEN with out_DEN.
3566        ofile = self.outdir.path_in("out_DEN")
3567        if last_timden.path.endswith(".nc"): ofile += ".nc"
3568        self.history.info("Renaming last_denfile %s --> %s" % (last_timden.path, ofile))
3569        os.rename(last_timden.path, ofile)
3570
3571
3572class DfptTask(AbinitTask):
3573    """
3574    Base class for DFPT tasks (Phonons, DdeTask, DdkTask, ElasticTask ...)
3575    Mainly used to implement methods that are common to DFPT calculations with Abinit.
3576    Provide the method `open_ddb` that reads and return a Ddb file.
3577
3578    .. warning::
3579
3580        This class should not be instantiated directly.
3581    """
3582    # TODO:
3583    # for the time being we don't discern between GS and PhononCalculations.
3584    CRITICAL_EVENTS = [
3585        events.ScfConvergenceWarning,
3586    ]
3587
3588    def __repr__(self):
3589        # Get info about DFT perturbation from input file.
3590        qpt = self.input.get("qpt", [0, 0, 0])
3591        rfphon = self.input.get("rfphon", 0)
3592        rfatpol = self.input.get("rfatpol", [1, 1])
3593        rfelfd = self.input.get("rfelfd", 0)
3594        rfstrs = self.input.get("rfstrs", 0)
3595        rfdir = self.input.get("rfdir", [0, 0, 0])
3596        irdddk = self.input.get("irdddk", 0)
3597
3598        dfpt_info = ""
3599        if rfphon != 0:
3600            dfpt_info = "qpt: {}, rfphon: {}, rfatpol: {}, rfdir: {}, irdddk: {}".format(
3601                    qpt, rfphon, rfatpol, rfdir, irdddk)
3602
3603        elif rfelfd != 0:
3604            dfpt_info = "qpt: {}, rfelfd: {} rfdir: {}, irdddk: {}".format(
3605                    qpt, rfelfd, rfdir, irdddk)
3606
3607        elif rfstrs != 0:
3608            dfpt_info = "qpt: {}, rfstrs: {}, rfdir: {}, irdddk: {}".format(
3609                    qpt, rfstrs, rfdir, irdddk)
3610
3611        try:
3612            return "<%s, node_id=%s, workdir=%s, %s>" % (
3613                self.__class__.__name__, self.node_id, self.relworkdir, dfpt_info)
3614        except AttributeError:
3615            # this usually happens when workdir has not been initialized
3616            return "<%s, node_id=%s, workdir=None, %s>" % (
3617                self.__class__.__name__, self.node_id, dfpt_info)
3618
3619    @property
3620    def ddb_path(self):
3621        """Absolute path of the DDB file. Empty string if file is not present."""
3622        # Lazy property to avoid multiple calls to has_abiext.
3623        try:
3624            return self._ddb_path
3625        except AttributeError:
3626            path = self.outdir.has_abiext("DDB")
3627            if path: self._ddb_path = path
3628            return path
3629
3630    def open_ddb(self):
3631        """
3632        Open the DDB file located in the in self.outdir.
3633        Returns a |DdbFile| object, None if file could not be found or file is not readable.
3634        """
3635        ddb_path = self.ddb_path
3636        if not ddb_path:
3637            if self.status == self.S_OK:
3638                self.history.critical("%s reached S_OK but didn't produce a DDB file in %s" % (self, self.outdir))
3639            return None
3640
3641        # Open the DDB file.
3642        from abipy.dfpt.ddb import DdbFile
3643        try:
3644            return DdbFile(ddb_path)
3645        except Exception as exc:
3646            self.history.critical("Exception while reading DDB file at %s:\n%s" % (ddb_path, str(exc)))
3647            return None
3648
3649    def make_links(self):
3650        """
3651        Replace the default behaviour of make_links. More specifically, this method
3652        implements the logic required to connect DFPT calculation to `DDK` files.
3653        Remember that DDK is an extension introduced in AbiPy to deal with the
3654        irdddk input variable and the fact that the 3 files with du/dk produced by Abinit
3655        have a file extension constructed from the number of atom (e.g. 1WF[3natom +1]).
3656
3657        AbiPy uses the user-friendly syntax deps={node: "DDK"} to specify that
3658        the children will read the DDK from `node` but this also means that
3659        we have to implement extract logic to handle this case at runtime.
3660        """
3661        for dep in self.deps:
3662            for d in dep.exts:
3663
3664                if d == "DDK":
3665                    ddk_task = dep.node
3666                    out_ddk = ddk_task.outdir.has_abiext("DDK")
3667                    if not out_ddk:
3668                        raise RuntimeError("%s didn't produce the DDK file" % ddk_task)
3669
3670                    # Get (fortran) idir and costruct the name of the 1WF expected by Abinit
3671                    rfdir = list(ddk_task.input["rfdir"])
3672                    if rfdir.count(1) != 1:
3673                        raise RuntimeError("Only one direction should be specifned in rfdir but rfdir = %s" % rfdir)
3674
3675                    idir = rfdir.index(1) + 1
3676                    ddk_case = idir + 3 * len(ddk_task.input.structure)
3677
3678                    infile = self.indir.path_in("in_1WF%d" % ddk_case)
3679                    if out_ddk.endswith(".nc"): infile = infile + ".nc"
3680                    os.symlink(out_ddk, infile)
3681
3682                elif d in ("WFK", "WFQ"):
3683                    gs_task = dep.node
3684                    out_wfk = gs_task.outdir.has_abiext(d)
3685                    if not out_wfk:
3686                        raise RuntimeError("%s didn't produce the %s file" % (gs_task, d))
3687
3688                    if d == "WFK":
3689                        bname = "in_WFK"
3690                    elif d == "WFQ":
3691                        bname = "in_WFQ"
3692                    else:
3693                        raise ValueError("Don't know how to handle `%s`" % d)
3694
3695                    # Ensure link has .nc extension if iomode 3
3696                    if out_wfk.endswith(".nc"): bname = bname + ".nc"
3697                    #print(d, out_wfk, "bname", bname)
3698                    if not os.path.exists(self.indir.path_in(bname)):
3699                        os.symlink(out_wfk, self.indir.path_in(bname))
3700
3701                elif d == "DEN":
3702                    gs_task = dep.node
3703                    out_wfk = gs_task.outdir.has_abiext("DEN")
3704                    if not out_wfk:
3705                        raise RuntimeError("%s didn't produce the DEN file" % gs_task)
3706                    infile = self.indir.path_in("in_DEN")
3707                    if out_wfk.endswith(".nc"): infile = infile + ".nc"
3708                    if not os.path.exists(infile):
3709                        os.symlink(out_wfk, infile)
3710
3711                elif d == "1WF":
3712                    gs_task = dep.node
3713                    out_wfk = gs_task.outdir.has_abiext("1WF")
3714                    if not out_wfk:
3715                        raise RuntimeError("%s didn't produce the 1WF file" % gs_task)
3716                    dest = self.indir.path_in("in_" + out_wfk.split("_")[-1])
3717                    if out_wfk.endswith(".nc"): dest = dest + ".nc"
3718                    if not os.path.exists(dest):
3719                        os.symlink(out_wfk, dest)
3720
3721                elif d == "1DEN":
3722                    gs_task = dep.node
3723                    out_wfk = gs_task.outdir.has_abiext("DEN")
3724                    if not out_wfk:
3725                        raise RuntimeError("%s didn't produce the 1DEN file" % gs_task)
3726                    dest = self.indir.path_in("in_" + out_wfk.split("_")[-1])
3727                    if out_wfk.endswith(".nc"): dest = dest + ".nc"
3728                    if not os.path.exists(dest):
3729                        os.symlink(out_wfk, dest)
3730
3731                else:
3732                    raise ValueError("Don't know how to handle extension: %s" % str(dep.exts))
3733
3734    def restart(self):
3735        """
3736        DFPT calculations can be restarted only if we have the 1WF file or the 1DEN file.
3737        from which we can read the first-order wavefunctions or the first order density.
3738        Prefer 1WF over 1DEN since we can reuse the wavefunctions.
3739        """
3740        # Abinit adds the idir-ipert index at the end of the file and this breaks the extension
3741        # e.g. out_1WF4, out_DEN4. find_1wf_files and find_1den_files returns the list of files found
3742        restart_file, irdvars = None, None
3743
3744        # Highest priority to the 1WF file because restart is more efficient.
3745        wf_files = self.outdir.find_1wf_files()
3746        if wf_files is not None:
3747            restart_file = wf_files[0].path
3748            irdvars = irdvars_for_ext("1WF")
3749            if len(wf_files) != 1:
3750                restart_file = None
3751                self.history.critical("Found more than one 1WF file in outdir. Restart is ambiguous!")
3752
3753        if restart_file is None:
3754            den_files = self.outdir.find_1den_files()
3755            if den_files is not None:
3756                restart_file = den_files[0].path
3757                irdvars = {"ird1den": 1}
3758                if len(den_files) != 1:
3759                    restart_file = None
3760                    self.history.critical("Found more than one 1DEN file in outdir. Restart is ambiguous!")
3761
3762        if restart_file is None:
3763            # Raise because otherwise restart is equivalent to a run from scratch --> infinite loop!
3764            raise self.RestartError("%s: Cannot find the 1WF|1DEN file to restart from." % self)
3765
3766        # Move file.
3767        self.history.info("Will restart from %s", restart_file)
3768        restart_file = self.out_to_in(restart_file)
3769
3770        # Add the appropriate variable for restarting.
3771        self.set_vars(irdvars)
3772
3773        # Now we can resubmit the job.
3774        return self._restart()
3775
3776
3777class DdeTask(DfptTask):
3778    """Task for DDE calculations (perturbation wrt electric field)."""
3779
3780    color_rgb = np.array((61, 158, 255)) / 255
3781
3782    def get_results(self, **kwargs):
3783        results = super().get_results(**kwargs)
3784        return results.register_gridfs_file(DDB=(self.outdir.has_abiext("DDE"), "t"))
3785
3786
3787class DteTask(DfptTask):
3788    """Task for DTE calculations."""
3789    color_rgb = np.array((204, 0, 204)) / 255
3790
3791    # @check_spectator
3792    def start(self, **kwargs):
3793        kwargs['autoparal'] = False
3794        return super().start(**kwargs)
3795
3796    def get_results(self, **kwargs):
3797        results = super().get_results(**kwargs)
3798        return results.register_gridfs_file(DDB=(self.outdir.has_abiext("DDE"), "t"))
3799
3800
3801class DdkTask(DfptTask):
3802    """Task for DDK calculations."""
3803    color_rgb = np.array((0, 204, 204)) / 255
3804
3805    #@check_spectator
3806    def _on_ok(self):
3807        super()._on_ok()
3808        # Client code expects to find du/dk in DDK file.
3809        # Here I create a symbolic link out_1WF13 --> out_DDK
3810        # so that we can use deps={ddk_task: "DDK"} in the high-level API.
3811        # The price to pay is that we have to handle the DDK extension in make_links.
3812        # See DfptTask.make_links
3813        self.outdir.symlink_abiext('1WF', 'DDK')
3814
3815    def get_results(self, **kwargs):
3816        results = super().get_results(**kwargs)
3817        return results.register_gridfs_file(DDK=(self.outdir.has_abiext("DDK"), "t"))
3818
3819
3820class BecTask(DfptTask):
3821    """
3822    Task for the calculation of Born effective charges.
3823
3824    bec_deps = {ddk_task: "DDK" for ddk_task in ddk_tasks}
3825    bec_deps.update({scf_task: "WFK"})
3826    """
3827    color_rgb = np.array((122, 122, 255)) / 255
3828
3829
3830class EffMassTask(DfptTask):
3831    """Task for effective mass calculations with DFPT."""
3832    color_rgb = np.array((0, 122, 204)) / 255
3833
3834
3835class PhononTask(DfptTask):
3836    """
3837    DFPT calculations for a single atomic perturbation.
3838    Provide support for in-place restart via (1WF|1DEN) files
3839    """
3840    color_rgb = np.array((0, 150, 250)) / 255
3841
3842    def inspect(self, **kwargs):
3843        """
3844        Plot the Phonon SCF cycle results with matplotlib.
3845
3846        Returns: |matplotlib-Figure| or None if some error occurred.
3847        """
3848        scf_cycle = abiinspect.PhononScfCycle.from_file(self.output_file.path)
3849        if scf_cycle is not None:
3850            if "title" not in kwargs: kwargs["title"] = str(self)
3851            return scf_cycle.plot(**kwargs)
3852
3853    def get_results(self, **kwargs):
3854        results = super().get_results(**kwargs)
3855        return results.register_gridfs_files(DDB=(self.outdir.has_abiext("DDB"), "t"))
3856
3857
3858class ElasticTask(DfptTask):
3859    """
3860    DFPT calculations for a single strain perturbation (uniaxial or shear strain).
3861    Provide support for in-place restart via (1WF|1DEN) files
3862    """
3863    color_rgb = np.array((255, 204, 255)) / 255
3864
3865
3866class EphTask(AbinitTask):
3867    """
3868    Class for electron-phonon calculations.
3869    """
3870    color_rgb = np.array((255, 128, 0)) / 255
3871
3872
3873class ManyBodyTask(AbinitTask):
3874    """
3875    Base class for Many-body tasks (Screening, Sigma, Bethe-Salpeter)
3876    Mainly used to implement methods that are common to MBPT calculations with Abinit.
3877
3878    .. warning::
3879
3880        This class should not be instantiated directly.
3881    """
3882    def reduce_memory_demand(self):
3883        """
3884        Method that can be called by the scheduler to decrease the memory demand of a specific task.
3885        Returns True in case of success, False in case of Failure.
3886        """
3887        # The first digit governs the storage of W(q), the second digit the storage of u(r)
3888        # Try to avoid the storage of u(r) first since reading W(q) from file will lead to a drammatic slowdown.
3889        prev_gwmem = int(self.get_inpvar("gwmem", default=11))
3890        first_dig, second_dig = prev_gwmem // 10, prev_gwmem % 10
3891
3892        if second_dig == 1:
3893            self.set_vars(gwmem="%.2d" % (10 * first_dig))
3894            return True
3895
3896        if first_dig == 1:
3897            self.set_vars(gwmem="%.2d" % 00)
3898            return True
3899
3900        # gwmem 00 d'oh!
3901        return False
3902
3903
3904class ScrTask(ManyBodyTask):
3905    """Tasks for SCREENING calculations """
3906
3907    color_rgb = np.array((255, 128, 0)) / 255
3908
3909    @property
3910    def scr_path(self):
3911        """Absolute path of the SCR file. Empty string if file is not present."""
3912        # Lazy property to avoid multiple calls to has_abiext.
3913        try:
3914            return self._scr_path
3915        except AttributeError:
3916            path = self.outdir.has_abiext("SCR.nc")
3917            if path: self._scr_path = path
3918            return path
3919
3920    def open_scr(self):
3921        """
3922        Open the SIGRES file located in the in self.outdir.
3923        Returns |ScrFile| object, None if file could not be found or file is not readable.
3924        """
3925        scr_path = self.scr_path
3926
3927        if not scr_path:
3928            self.history.critical("%s didn't produce a SCR.nc file in %s" % (self, self.outdir))
3929            return None
3930
3931        # Open the GSR file and add its data to results.out
3932        from abipy.electrons.scr import ScrFile
3933        try:
3934            return ScrFile(scr_path)
3935        except Exception as exc:
3936            self.history.critical("Exception while reading SCR file at %s:\n%s" % (scr_path, str(exc)))
3937            return None
3938
3939
3940class SigmaTask(ManyBodyTask):
3941    """
3942    Tasks for SIGMA calculations. Provides support for in-place restart via QPS files
3943    """
3944    CRITICAL_EVENTS = [
3945        events.QPSConvergenceWarning,
3946    ]
3947
3948    color_rgb = np.array((0, 255, 0)) / 255
3949
3950    def restart(self):
3951        # G calculations can be restarted only if we have the QPS file
3952        # from which we can read the results of the previous step.
3953        ext = "QPS"
3954        restart_file = self.outdir.has_abiext(ext)
3955        if not restart_file:
3956            raise self.RestartError("%s: Cannot find the QPS file to restart from." % self)
3957
3958        self.out_to_in(restart_file)
3959
3960        # Add the appropriate variable for restarting.
3961        irdvars = irdvars_for_ext(ext)
3962        self.set_vars(irdvars)
3963
3964        # Now we can resubmit the job.
3965        self.history.info("Will restart from %s", restart_file)
3966        return self._restart()
3967
3968    #def inspect(self, **kwargs):
3969    #    """Plot graph showing the number of k-points computed and the wall-time used"""
3970
3971    @property
3972    def sigres_path(self):
3973        """Absolute path of the SIGRES file. Empty string if file is not present."""
3974        # Lazy property to avoid multiple calls to has_abiext.
3975        try:
3976            return self._sigres_path
3977        except AttributeError:
3978            path = self.outdir.has_abiext("SIGRES")
3979            if path: self._sigres_path = path
3980            return path
3981
3982    def open_sigres(self):
3983        """
3984        Open the SIGRES file located in the in self.outdir.
3985        Returns |SigresFile| object, None if file could not be found or file is not readable.
3986        """
3987        sigres_path = self.sigres_path
3988
3989        if not sigres_path:
3990            self.history.critical("%s didn't produce a SIGRES file in %s" % (self, self.outdir))
3991            return None
3992
3993        # Open the SIGRES file and add its data to results.out
3994        from abipy.electrons.gw import SigresFile
3995        try:
3996            return SigresFile(sigres_path)
3997        except Exception as exc:
3998            self.history.critical("Exception while reading SIGRES file at %s:\n%s" % (sigres_path, str(exc)))
3999            return None
4000
4001    def get_scissors_builder(self):
4002        """
4003        Returns an instance of :class:`ScissorsBuilder` from the SIGRES file.
4004
4005        Raise:
4006            `RuntimeError` if SIGRES file is not found.
4007        """
4008        from abipy.electrons.scissors import ScissorsBuilder
4009        if self.sigres_path:
4010            return ScissorsBuilder.from_file(self.sigres_path)
4011        else:
4012            raise RuntimeError("Cannot find SIGRES file!")
4013
4014    def get_results(self, **kwargs):
4015        results = super().get_results(**kwargs)
4016
4017        # Open the SIGRES file and add its data to results.out
4018        with self.open_sigres() as sigres:
4019            #results["out"].update(sigres.as_dict())
4020            results.register_gridfs_files(SIGRES=sigres.filepath)
4021
4022        return results
4023
4024
4025class BseTask(ManyBodyTask):
4026    """
4027    Task for Bethe-Salpeter calculations.
4028
4029    .. note::
4030
4031        The BSE codes provides both iterative and direct schemes for the computation of the dielectric function.
4032        The direct diagonalization cannot be restarted whereas Haydock and CG support restarting.
4033    """
4034    CRITICAL_EVENTS = [
4035        events.HaydockConvergenceWarning,
4036        #events.BseIterativeDiagoConvergenceWarning,
4037    ]
4038
4039    color_rgb = np.array((128, 0, 255)) / 255
4040
4041    def restart(self):
4042        """
4043        BSE calculations with Haydock can be restarted only if we have the
4044        excitonic Hamiltonian and the HAYDR_SAVE file.
4045        """
4046        # TODO: This version seems to work but the main output file is truncated
4047        # TODO: Handle restart if CG method is used
4048        # TODO: restart should receive a list of critical events
4049        # the log file is complete though.
4050        irdvars = {}
4051
4052        # Move the BSE blocks to indata.
4053        # This is done only once at the end of the first run.
4054        # Successive restarts will use the BSR|BSC files in the indir directory
4055        # to initialize the excitonic Hamiltonian
4056        count = 0
4057        for ext in ("BSR", "BSC"):
4058            ofile = self.outdir.has_abiext(ext)
4059            if ofile:
4060                count += 1
4061                irdvars.update(irdvars_for_ext(ext))
4062                self.out_to_in(ofile)
4063
4064        if not count:
4065            # outdir does not contain the BSR|BSC file.
4066            # This means that num_restart > 1 and the files should be in task.indir
4067            count = 0
4068            for ext in ("BSR", "BSC"):
4069                ifile = self.indir.has_abiext(ext)
4070                if ifile:
4071                    count += 1
4072
4073            if not count:
4074                raise self.RestartError("%s: Cannot find BSR|BSC files in %s" % (self, self.indir))
4075
4076        # Rename HAYDR_SAVE files
4077        count = 0
4078        for ext in ("HAYDR_SAVE", "HAYDC_SAVE"):
4079            ofile = self.outdir.has_abiext(ext)
4080            if ofile:
4081                count += 1
4082                irdvars.update(irdvars_for_ext(ext))
4083                self.out_to_in(ofile)
4084
4085        if not count:
4086            raise self.RestartError("%s: Cannot find the HAYDR_SAVE file to restart from." % self)
4087
4088        # Add the appropriate variable for restarting.
4089        self.set_vars(irdvars)
4090
4091        # Now we can resubmit the job.
4092        #self.history.info("Will restart from %s", restart_file)
4093        return self._restart()
4094
4095    #def inspect(self, **kwargs):
4096    #    """
4097    #    Plot the Haydock iterations with matplotlib.
4098    #
4099    #    Returns: |matplotlib-Figure| or None if some error occurred.
4100    #    """
4101    #    haydock_cycle = abiinspect.HaydockIterations.from_file(self.output_file.path)
4102    #    if haydock_cycle is not None:
4103    #        if "title" not in kwargs: kwargs["title"] = str(self)
4104    #        return haydock_cycle.plot(**kwargs)
4105
4106    @property
4107    def mdf_path(self):
4108        """Absolute path of the MDF file. Empty string if file is not present."""
4109        # Lazy property to avoid multiple calls to has_abiext.
4110        try:
4111            return self._mdf_path
4112        except AttributeError:
4113            path = self.outdir.has_abiext("MDF.nc")
4114            if path: self._mdf_path = path
4115            return path
4116
4117    def open_mdf(self):
4118        """
4119        Open the MDF file located in the in self.outdir.
4120        Returns |MdfFile| object, None if file could not be found or file is not readable.
4121        """
4122        mdf_path = self.mdf_path
4123        if not mdf_path:
4124            self.history.critical("%s didn't produce a MDF file in %s" % (self, self.outdir))
4125            return None
4126
4127        # Open the DFF file and add its data to results.out
4128        from abipy.electrons.bse import MdfFile
4129        try:
4130            return MdfFile(mdf_path)
4131        except Exception as exc:
4132            self.history.critical("Exception while reading MDF file at %s:\n%s" % (mdf_path, str(exc)))
4133            return None
4134
4135    def get_results(self, **kwargs):
4136        results = super().get_results(**kwargs)
4137
4138        with self.open_mdf() as mdf:
4139            #results["out"].update(mdf.as_dict())
4140            #epsilon_infinity optical_gap
4141            results.register_gridfs_files(MDF=mdf.filepath)
4142
4143        return results
4144
4145
4146class OpticTask(Task):
4147    """
4148    Task for the computation of optical spectra with optic i.e.
4149    RPA without local-field effects and velocity operator computed from DDK files.
4150    """
4151    color_rgb = np.array((255, 204, 102)) / 255
4152
4153    def __init__(self, optic_input, nscf_node, ddk_nodes, use_ddknc=False, workdir=None, manager=None):
4154        """
4155        Create an instance of :class:`OpticTask` from n string containing the input.
4156
4157        Args:
4158            optic_input: :class:`OpticInput` object with optic variables.
4159            nscf_node: The task that will produce the WFK file with the KS energies or path to the WFK file.
4160            ddk_nodes: List of :class:`DdkTask` nodes that will produce the DDK files or list of DDK filepaths.
4161                Order (x, y, z)
4162            workdir: Path to the working directory.
4163            manager: |TaskManager| object.
4164        """
4165        # Convert paths to FileNodes
4166        self.nscf_node = Node.as_node(nscf_node)
4167        self.ddk_nodes = [Node.as_node(n) for n in ddk_nodes]
4168        assert len(ddk_nodes) == 3
4169        #print(self.nscf_node, self.ddk_nodes)
4170
4171        # Use DDK extension instead of 1WF
4172        if use_ddknc:
4173            deps = {n: "DDK.nc" for n in self.ddk_nodes}
4174        else:
4175            deps = {n: "1WF" for n in self.ddk_nodes}
4176
4177        deps.update({self.nscf_node: "WFK"})
4178
4179        super().__init__(optic_input, workdir=workdir, manager=manager, deps=deps)
4180
4181    def set_workdir(self, workdir, chroot=False):
4182        """Set the working directory of the task."""
4183        super().set_workdir(workdir, chroot=chroot)
4184        # Small hack: the log file of optics is actually the main output file.
4185        self.output_file = self.log_file
4186
4187    def set_vars(self, *args, **kwargs):
4188        """
4189        Optic does not use `get` or `ird` variables hence we should never try
4190        to change the input when we connect this task
4191        """
4192        kwargs.update(dict(*args))
4193        self.history.info("OpticTask intercepted set_vars with args %s" % kwargs)
4194
4195        if "autoparal" in kwargs: self.input.set_vars(autoparal=kwargs["autoparal"])
4196        if "max_ncpus" in kwargs: self.input.set_vars(max_ncpus=kwargs["max_ncpus"])
4197
4198    @property
4199    def executable(self):
4200        """Path to the executable required for running the :class:`OpticTask`."""
4201        try:
4202            return self._executable
4203        except AttributeError:
4204            return "optic"
4205
4206    @property
4207    def filesfile_string(self):
4208        """String with the list of files and prefixes needed to execute ABINIT."""
4209        lines = []
4210        app = lines.append
4211
4212        app(self.input_file.path)                           # Path to the input file
4213        app(os.path.join(self.workdir, "unused"))           # Path to the output file
4214        app(os.path.join(self.workdir, self.prefix.odata))  # Prefix for output data
4215
4216        return "\n".join(lines)
4217
4218    @property
4219    def wfk_filepath(self):
4220        """Returns (at runtime) the absolute path of the WFK file produced by the NSCF run."""
4221        return self.nscf_node.outdir.has_abiext("WFK")
4222
4223    @property
4224    def ddk_filepaths(self):
4225        """Returns (at runtime) the absolute path of the DDK files produced by the DDK runs."""
4226        # This to support new version of optic that used DDK.nc
4227        paths = [ddk_task.outdir.has_abiext("DDK.nc") for ddk_task in self.ddk_nodes]
4228        if all(p for p in paths):
4229            return paths
4230
4231        # This is deprecated and can be removed when new version of Abinit is released.
4232        return [ddk_task.outdir.has_abiext("1WF") for ddk_task in self.ddk_nodes]
4233
4234    def make_input(self):
4235        """Construct and write the input file of the calculation."""
4236        # Set the file paths.
4237        all_files = {"ddkfile_" + str(n + 1): ddk for n, ddk in enumerate(self.ddk_filepaths)}
4238        all_files.update({"wfkfile": self.wfk_filepath})
4239        files_nml = {"FILES": all_files}
4240        files = nmltostring(files_nml)
4241
4242        # Get the input specified by the user
4243        user_file = nmltostring(self.input.as_dict())
4244
4245        # Join them.
4246        return files + user_file
4247
4248    def setup(self):
4249        """Public method called before submitting the task."""
4250
4251    def make_links(self):
4252        """
4253        Optic allows the user to specify the paths of the input file.
4254        hence we don't need to create symbolic links.
4255        """
4256
4257    def get_results(self, **kwargs):
4258        return super().get_results(**kwargs)
4259
4260    def fix_abicritical(self):
4261        """
4262        Cannot fix abicritical errors for optic
4263        """
4264        return 0
4265
4266    #@check_spectator
4267    def reset_from_scratch(self):
4268        """
4269        restart from scratch, this is to be used if a job is restarted with more resources after a crash
4270        """
4271        # Move output files produced in workdir to _reset otherwise check_status continues
4272        # to see the task as crashed even if the job did not run
4273        # Create reset directory if not already done.
4274        reset_dir = os.path.join(self.workdir, "_reset")
4275        reset_file = os.path.join(reset_dir, "_counter")
4276        if not os.path.exists(reset_dir):
4277            os.mkdir(reset_dir)
4278            num_reset = 1
4279        else:
4280            with open(reset_file, "rt") as fh:
4281                num_reset = 1 + int(fh.read())
4282
4283        # Move files to reset and append digit with reset index.
4284        def move_file(f):
4285            if not f.exists: return
4286            try:
4287                f.move(os.path.join(reset_dir, f.basename + "_" + str(num_reset)))
4288            except OSError as exc:
4289                self.history.warning("Couldn't move file {}. exc: {}".format(f, str(exc)))
4290
4291        for fname in ("output_file", "log_file", "stderr_file", "qout_file", "qerr_file", "mpiabort_file"):
4292            move_file(getattr(self, fname))
4293
4294        with open(reset_file, "wt") as fh:
4295            fh.write(str(num_reset))
4296
4297        self.start_lockfile.remove()
4298
4299        # Reset datetimes
4300        self.datetimes.reset()
4301
4302        return self._restart(submit=False)
4303
4304    def fix_queue_critical(self):
4305        """
4306        This function tries to fix critical events originating from the queue submission system.
4307
4308        General strategy, first try to increase resources in order to fix the problem,
4309        if this is not possible, call a task specific method to attempt to decrease the demands.
4310
4311        Returns:
4312            1 if task has been fixed else 0.
4313        """
4314        from abipy.flowtk.scheduler_error_parsers import NodeFailureError, MemoryCancelError, TimeCancelError
4315
4316        if not self.queue_errors:
4317            if self.mem_scales or self.load_scales:
4318                try:
4319                    self.manager.increase_resources()  # acts either on the policy or on the qadapter
4320                    self.reset_from_scratch()
4321                    return
4322                except ManagerIncreaseError:
4323                    self.set_status(self.S_ERROR, msg='unknown queue error, could not increase resources any further')
4324                    raise FixQueueCriticalError
4325            else:
4326                self.set_status(self.S_ERROR, msg='unknown queue error, no options left')
4327                raise FixQueueCriticalError
4328
4329        else:
4330            for error in self.queue_errors:
4331                self.history.info('fixing: %s' % str(error))
4332
4333                if isinstance(error, NodeFailureError):
4334                    # if the problematic node is known, exclude it
4335                    if error.nodes is not None:
4336                        try:
4337                            self.manager.exclude_nodes(error.nodes)
4338                            self.reset_from_scratch()
4339                            self.set_status(self.S_READY, msg='excluding nodes')
4340                        except Exception:
4341                            raise FixQueueCriticalError
4342                    else:
4343                        self.set_status(self.S_ERROR, msg='Node error but no node identified.')
4344                        raise FixQueueCriticalError
4345
4346                elif isinstance(error, MemoryCancelError):
4347                    # ask the qadapter to provide more resources, i.e. more cpu's so more total memory if the code
4348                    # scales this should fix the memeory problem
4349                    # increase both max and min ncpu of the autoparalel and rerun autoparalel
4350                    if self.mem_scales:
4351                        try:
4352                            self.manager.increase_ncpus()
4353                            self.reset_from_scratch()
4354                            self.set_status(self.S_READY, msg='increased ncps to solve memory problem')
4355                            return
4356                        except ManagerIncreaseError:
4357                            self.history.warning('increasing ncpus failed')
4358
4359                    # if the max is reached, try to increase the memory per cpu:
4360                    try:
4361                        self.manager.increase_mem()
4362                        self.reset_from_scratch()
4363                        self.set_status(self.S_READY, msg='increased mem')
4364                        return
4365                    except ManagerIncreaseError:
4366                        self.history.warning('increasing mem failed')
4367
4368                    # if this failed ask the task to provide a method to reduce the memory demand
4369                    try:
4370                        self.reduce_memory_demand()
4371                        self.reset_from_scratch()
4372                        self.set_status(self.S_READY, msg='decreased mem demand')
4373                        return
4374                    except DecreaseDemandsError:
4375                        self.history.warning('decreasing demands failed')
4376
4377                    msg = ('Memory error detected but the memory could not be increased neither could the\n'
4378                           'memory demand be decreased. Unrecoverable error.')
4379                    self.set_status(self.S_ERROR, msg)
4380                    raise FixQueueCriticalError
4381
4382                elif isinstance(error, TimeCancelError):
4383                    # ask the qadapter to provide more time
4384                    try:
4385                        self.manager.increase_time()
4386                        self.reset_from_scratch()
4387                        self.set_status(self.S_READY, msg='increased wall time')
4388                        return
4389                    except ManagerIncreaseError:
4390                        self.history.warning('increasing the walltime failed')
4391
4392                    # if this fails ask the qadapter to increase the number of cpus
4393                    if self.load_scales:
4394                        try:
4395                            self.manager.increase_ncpus()
4396                            self.reset_from_scratch()
4397                            self.set_status(self.S_READY, msg='increased number of cpus')
4398                            return
4399                        except ManagerIncreaseError:
4400                            self.history.warning('increase ncpus to speed up the calculation to stay in the walltime failed')
4401
4402                    # if this failed ask the task to provide a method to speed up the task
4403                    try:
4404                        self.speed_up()
4405                        self.reset_from_scratch()
4406                        self.set_status(self.S_READY, msg='task speedup')
4407                        return
4408                    except DecreaseDemandsError:
4409                        self.history.warning('decreasing demands failed')
4410
4411                    msg = ('Time cancel error detected but the time could not be increased neither could\n'
4412                           'the time demand be decreased by speedup of increasing the number of cpus.\n'
4413                           'Unrecoverable error.')
4414                    self.set_status(self.S_ERROR, msg)
4415
4416                else:
4417                    msg = 'No solution provided for error %s. Unrecoverable error.' % error.name
4418                    self.set_status(self.S_ERROR, msg)
4419
4420        return 0
4421
4422    def autoparal_run(self):
4423        """
4424        Find an optimal set of parameters for the execution of the Optic task
4425        This method can change the submission parameters e.g. the number of CPUs for MPI and OpenMp.
4426
4427        Returns 0 if success
4428        """
4429        policy = self.manager.policy
4430
4431        if policy.autoparal == 0: # or policy.max_ncpus in [None, 1]:
4432            self.history.info("Nothing to do in autoparal, returning (None, None)")
4433            return 0
4434
4435        if policy.autoparal != 1:
4436            raise NotImplementedError("autoparal != 1")
4437
4438        ############################################################################
4439        # Run ABINIT in sequential to get the possible configurations with max_ncpus
4440        ############################################################################
4441
4442        # Set the variables for automatic parallelization
4443        # Will get all the possible configurations up to max_ncpus
4444        # Return immediately if max_ncpus == 1
4445        max_ncpus = self.manager.max_cores
4446        if max_ncpus == 1: return 0
4447
4448        autoparal_vars = dict(autoparal=policy.autoparal, max_ncpus=max_ncpus)
4449        self.set_vars(autoparal_vars)
4450
4451        # Run the job in a shell subprocess with mpi_procs = 1
4452        # we don't want to make a request to the queue manager for this simple job!
4453        # Return code is always != 0
4454        process = self.manager.to_shell_manager(mpi_procs=1).launch(self)
4455        self.history.pop()
4456        retcode = process.wait()
4457        # To avoid: ResourceWarning: unclosed file <_io.BufferedReader name=87> in py3k
4458        process.stderr.close()
4459        #process.stdout.close()
4460
4461        # Remove the variables added for the automatic parallelization
4462        self.input.remove_vars(list(autoparal_vars.keys()))
4463
4464        ##############################################################
4465        # Parse the autoparal configurations from the main output file
4466        ##############################################################
4467        parser = ParalHintsParser()
4468        try:
4469            pconfs = parser.parse(self.output_file.path)
4470        except parser.Error:
4471            # In principle Abinit should have written a complete log file
4472            # because we called .wait() but sometimes the Yaml doc is incomplete and
4473            # the parser raises. Let's wait 5 secs and then try again.
4474            time.sleep(5)
4475            try:
4476                pconfs = parser.parse(self.output_file.path)
4477            except parser.Error:
4478                self.history.critical("Error while parsing Autoparal section:\n%s" % straceback())
4479                return 2
4480
4481        ######################################################
4482        # Select the optimal configuration according to policy
4483        ######################################################
4484        #optconf = self.find_optconf(pconfs)
4485        # Select the partition on which we'll be running and set MPI/OMP cores.
4486        optconf = self.manager.select_qadapter(pconfs)
4487
4488        ####################################################
4489        # Change the input file and/or the submission script
4490        ####################################################
4491        self.set_vars(optconf.vars)
4492
4493        # Write autoparal configurations to JSON file.
4494        d = pconfs.as_dict()
4495        d["optimal_conf"] = optconf
4496        json_pretty_dump(d, os.path.join(self.workdir, "autoparal.json"))
4497
4498        ##############
4499        # Finalization
4500        ##############
4501        # Reset the status, remove garbage files ...
4502        self.set_status(self.S_INIT, msg='finished auto paralell')
4503
4504        # Remove the output file since Abinit likes to create new files
4505        # with extension .outA, .outB if the file already exists.
4506        os.remove(self.output_file.path)
4507        #os.remove(self.log_file.path)
4508        os.remove(self.stderr_file.path)
4509
4510        return 0
4511
4512
4513class AnaddbTask(Task):
4514    """Task for Anaddb runs (post-processing of DFPT calculations)."""
4515
4516    color_rgb = np.array((204, 102, 255)) / 255
4517
4518    def __init__(self, anaddb_input, ddb_node,
4519                 gkk_node=None, md_node=None, ddk_node=None, workdir=None, manager=None):
4520        """
4521        Create an instance of AnaddbTask from a string containing the input.
4522
4523        Args:
4524            anaddb_input: string with the anaddb variables.
4525            ddb_node: The node that will produce the DDB file. Accept |Task|, |Work| or filepath.
4526            gkk_node: The node that will produce the GKK file (optional). Accept |Task|, |Work| or filepath.
4527            md_node: The node that will produce the MD file (optional). Accept |Task|, |Work| or filepath.
4528            gkk_node: The node that will produce the GKK file (optional). Accept |Task|, |Work| or filepath.
4529            workdir: Path to the working directory (optional).
4530            manager: |TaskManager| object (optional).
4531        """
4532        # Keep a reference to the nodes.
4533        self.ddb_node = Node.as_node(ddb_node)
4534        deps = {self.ddb_node: "DDB"}
4535
4536        self.gkk_node = Node.as_node(gkk_node)
4537        if self.gkk_node is not None:
4538            deps.update({self.gkk_node: "GKK"})
4539
4540        # I never used it!
4541        self.md_node = Node.as_node(md_node)
4542        if self.md_node is not None:
4543            deps.update({self.md_node: "MD"})
4544
4545        self.ddk_node = Node.as_node(ddk_node)
4546        if self.ddk_node is not None:
4547            deps.update({self.ddk_node: "DDK"})
4548
4549        super().__init__(input=anaddb_input, workdir=workdir, manager=manager, deps=deps)
4550
4551    @classmethod
4552    def temp_shell_task(cls, inp, ddb_node, mpi_procs=1,
4553                        gkk_node=None, md_node=None, ddk_node=None, workdir=None, manager=None):
4554        """
4555        Build a |AnaddbTask| with a temporary workdir. The task is executed via
4556        the shell with 1 MPI proc. Mainly used for post-processing the DDB files.
4557
4558        Args:
4559            mpi_procs: Number of MPI processes to use.
4560            anaddb_input: string with the anaddb variables.
4561            ddb_node: The node that will produce the DDB file. Accept |Task|, |Work| or filepath.
4562
4563        See `AnaddbInit` for the meaning of the other arguments.
4564        """
4565        # Build a simple manager to run the job in a shell subprocess
4566        import tempfile
4567        workdir = tempfile.mkdtemp() if workdir is None else workdir
4568        if manager is None: manager = TaskManager.from_user_config()
4569
4570        # Construct the task and run it
4571        return cls(inp, ddb_node,
4572                   gkk_node=gkk_node, md_node=md_node, ddk_node=ddk_node,
4573                   workdir=workdir, manager=manager.to_shell_manager(mpi_procs=mpi_procs))
4574
4575    @property
4576    def executable(self):
4577        """Path to the executable required for running the |AnaddbTask|."""
4578        try:
4579            return self._executable
4580        except AttributeError:
4581            return "anaddb"
4582
4583    @property
4584    def filesfile_string(self):
4585        """String with the list of files and prefixes needed to execute ABINIT."""
4586        lines = []
4587        app = lines.append
4588
4589        app(self.input_file.path)          # 1) Path of the input file
4590        app(self.output_file.path)         # 2) Path of the output file
4591        app(self.ddb_filepath)             # 3) Input derivative database e.g. t13.ddb.in
4592        app(self.md_filepath)              # 4) Output molecular dynamics e.g. t13.md
4593        app(self.gkk_filepath)             # 5) Input elphon matrix elements  (GKK file)
4594        app(self.outdir.path_join("out"))  # 6) Base name for elphon output files e.g. t13
4595        app(self.ddk_filepath)             # 7) File containing ddk filenames for elphon/transport.
4596
4597        return "\n".join(lines)
4598
4599    @property
4600    def ddb_filepath(self):
4601        """Returns (at runtime) the absolute path of the input DDB file."""
4602        # This is not very elegant! A possible approach could to be path self.ddb_node.outdir!
4603        if isinstance(self.ddb_node, FileNode): return self.ddb_node.filepath
4604        path = self.ddb_node.outdir.has_abiext("DDB")
4605        return path if path else "DDB_FILE_DOES_NOT_EXIST"
4606
4607    @property
4608    def md_filepath(self):
4609        """Returns (at runtime) the absolute path of the input MD file."""
4610        if self.md_node is None: return "MD_FILE_DOES_NOT_EXIST"
4611        if isinstance(self.md_node, FileNode): return self.md_node.filepath
4612
4613        path = self.md_node.outdir.has_abiext("MD")
4614        return path if path else "MD_FILE_DOES_NOT_EXIST"
4615
4616    @property
4617    def gkk_filepath(self):
4618        """Returns (at runtime) the absolute path of the input GKK file."""
4619        if self.gkk_node is None: return "GKK_FILE_DOES_NOT_EXIST"
4620        if isinstance(self.gkk_node, FileNode): return self.gkk_node.filepath
4621
4622        path = self.gkk_node.outdir.has_abiext("GKK")
4623        return path if path else "GKK_FILE_DOES_NOT_EXIST"
4624
4625    @property
4626    def ddk_filepath(self):
4627        """Returns (at runtime) the absolute path of the input DKK file."""
4628        if self.ddk_node is None: return "DDK_FILE_DOES_NOT_EXIST"
4629        if isinstance(self.ddk_node, FileNode): return self.ddk_node.filepath
4630
4631        path = self.ddk_node.outdir.has_abiext("DDK")
4632        return path if path else "DDK_FILE_DOES_NOT_EXIST"
4633
4634    def setup(self):
4635        """Public method called before submitting the task."""
4636
4637    def make_links(self):
4638        """
4639        Anaddb allows the user to specify the paths of the input file.
4640        hence we don't need to create symbolic links.
4641        """
4642
4643    def outpath_from_ext(self, ext):
4644        if ext == "anaddb.nc":
4645            path = os.path.join(self.workdir, "anaddb.nc")
4646            if os.path.isfile(path): return path
4647
4648        path = self.wdir.has_abiext(ext)
4649        if not path:
4650            raise RuntimeError("Anaddb task `%s` didn't produce file with extenstion: `%s`" % (self, ext))
4651
4652        return path
4653
4654    def open_phbst(self):
4655        """Open PHBST file produced by Anaddb and returns |PhbstFile| object."""
4656        from abipy.dfpt.phonons import PhbstFile
4657        phbst_path = self.outpath_from_ext("PHBST.nc")
4658        return PhbstFile(phbst_path)
4659
4660    def open_phdos(self):
4661        """Open PHDOS file produced by Anaddb and returns |PhdosFile| object."""
4662        from abipy.dfpt.phonons import PhdosFile
4663        phdos_path = self.outpath_from_ext("PHDOS.nc")
4664        return PhdosFile(phdos_path)
4665
4666    def get_results(self, **kwargs):
4667        return super().get_results(**kwargs)
4668
4669
4670#class BoxcuttedPhononTask(PhononTask):
4671#    """
4672#    This task compute phonons with a two-step algorithm.
4673#    The first DFPT run is done with low-accuracy settings for boxcutmin and ecut
4674#    The second DFPT run uses boxcutmin 2.0 and normal ecut and restarts from
4675#    the 1WFK file generated previously.
4676#    """
4677#    @classmethod
4678#    def patch_flow(cls, flow):
4679#        for task in flow.iflat_tasks():
4680#            if isinstance(task, PhononTask): task.__class__ = cls
4681#
4682#    def setup(self):
4683#        super().setup()
4684#        self.final_dfp_done = False if not hasattr(self, "final_dfp_done") else self.final_dfp_done
4685#        if not self.final_dfp_done:
4686#            # First run: use boxcutmin 1.5 and low-accuracy hints (assume pseudos with hints).
4687#            pseudos = self.input.pseudos
4688#            ecut = max(p.hint_for_accuracy("low").ecut for p in pseudos)
4689#            pawecutdg = max(p.hint_for_accuracy("low").pawecutdg for p in pseudos) if self.input.ispaw else None
4690#            self.set_vars(boxcutmin=1.5, ecut=ecut, pawecutdg=pawecutdg, prtwf=1)
4691#
4692#    def _on_ok(self):
4693#        results = super()._on_ok()
4694#        if not self.final_dfp_done:
4695#            # Second run: use exact box and normal-accuracy hints (assume pseudos with hints).
4696#            pseudos = self.input.pseudos
4697#            ecut = max(p.hint_for_accuracy("normal").ecut for p in pseudos)
4698#            pawecutdg = max(p.hint_for_accuracy("normal").pawecutdg for p in pseudos) if self.input.ispaw else None
4699#            self.set_vars(boxcutmin=2.0, ecut=ecut, pawecutdg=pawecutdg, prtwf=-1)
4700#            self.finalized = True
4701#            self.final_dfp_done = True
4702#            self.restart()
4703#
4704#        return results
4705