1# coding: utf-8
2"""
3Works for Abinit
4"""
5
6import os
7import shutil
8import time
9import abc
10import collections
11import numpy as np
12
13from monty.collections import AttrDict
14from monty.itertools import chunks
15from monty.functools import lazy_property
16from monty.fnmatch import WildCard
17from pydispatch import dispatcher
18from pymatgen.core.units import EnergyArray
19from . import wrappers
20from .nodes import Dependency, Node, NodeError, NodeResults, FileNode #, check_spectator
21from .tasks import (Task, AbinitTask, ScfTask, NscfTask, DfptTask, PhononTask, ElasticTask, DdkTask, EffMassTask,
22                    BseTask, RelaxTask, DdeTask, BecTask, ScrTask, SigmaTask, TaskManager,
23                    DteTask, EphTask, CollinearThenNonCollinearScfTask)
24
25from .utils import Directory
26from .netcdf import ETSF_Reader, NetcdfReader
27from .abitimer import AbinitTimerParser
28
29__author__ = "Matteo Giantomassi"
30__copyright__ = "Copyright 2013, The Materials Project"
31__version__ = "0.1"
32__maintainer__ = "Matteo Giantomassi"
33
34
35__all__ = [
36    "Work",
37    "BandStructureWork",
38    "RelaxWork",
39    "G0W0Work",
40    "QptdmWork",
41    "SigmaConvWork",
42    "BseMdfWork",
43    "PhononWork",
44    "PhononWfkqWork",
45    "GKKPWork",
46    "BecWork",
47    "DteWork",
48    "ConducWork",
49]
50
51
52class WorkResults(NodeResults):
53    JSON_SCHEMA = NodeResults.JSON_SCHEMA.copy()
54
55    @classmethod
56    def from_node(cls, work):
57        """Initialize an instance from a |Work| instance."""
58        new = super().from_node(work)
59
60        # Will put all files found in outdir in GridFs
61        # Warning: assuming binary files.
62        d = {os.path.basename(f): f for f in work.outdir.list_filepaths()}
63        new.register_gridfs_files(**d)
64
65        return new
66
67
68class WorkError(NodeError):
69    """Base class for the exceptions raised by Work objects."""
70
71
72class BaseWork(Node, metaclass=abc.ABCMeta):
73    """
74    .. rubric:: Inheritance Diagram
75    .. inheritance-diagram:: BaseWork
76    """
77    Error = WorkError
78
79    Results = WorkResults
80
81    # interface modeled after subprocess.Popen
82    @property
83    @abc.abstractmethod
84    def processes(self):
85        """Return a list of objects that support the `subprocess.Popen` protocol."""
86
87    def poll(self):
88        """
89        Check if all child processes have terminated. Set and return returncode attribute.
90        """
91        return [task.poll() for task in self]
92
93    def wait(self):
94        """
95        Wait for child processed to terminate. Set and return returncode attribute.
96        """
97        return [task.wait() for task in self]
98
99    def communicate(self, input=None):
100        """
101        Interact with processes: Send data to stdin. Read data from stdout and
102        stderr, until end-of-file is reached.
103        Wait for process to terminate. The optional input argument should be a
104        string to be sent to the child processed, or None, if no data should be
105        sent to the children.
106
107        communicate() returns a list of tuples (stdoutdata, stderrdata).
108        """
109        return [task.communicate(input) for task in self]
110
111    @property
112    def returncodes(self):
113        """
114        The children return codes, set by poll() and wait() (and indirectly by communicate()).
115        A None value indicates that the process hasn't terminated yet.
116        A negative value -N indicates that the child was terminated by signal N (Unix only).
117        """
118        return [task.returncode for task in self]
119
120    @property
121    def ncores_reserved(self):
122        """
123        Returns the number of cores reserved in this moment.
124        A core is reserved if it's still not running but
125        we have submitted the task to the queue manager.
126        """
127        return sum(task.manager.num_cores for task in self if task.status == task.S_SUB)
128
129    @property
130    def ncores_allocated(self):
131        """
132        Returns the number of CPUs allocated in this moment.
133        A core is allocated if it's running a task or if we have
134        submitted a task to the queue manager but the job is still pending.
135        """
136        return sum(task.manager.num_cores for task in self if task.status in [task.S_SUB, task.S_RUN])
137
138    @property
139    def ncores_used(self):
140        """
141        Returns the number of cores used in this moment.
142        A core is used if there's a job that is running on it.
143        """
144        return sum(task.manager.num_cores for task in self if task.status == task.S_RUN)
145
146    def fetch_task_to_run(self):
147        """
148        Returns the first task that is ready to run or
149        None if no task can be submitted at present"
150
151        Raises:
152            `StopIteration` if all tasks are done.
153        """
154        # All the tasks are done so raise an exception
155        # that will be handled by the client code.
156        if all(task.is_completed for task in self):
157            raise StopIteration("All tasks completed.")
158
159        for task in self:
160            if task.can_run:
161                return task
162
163        # No task found, this usually happens when we have dependencies.
164        # Beware of possible deadlocks here!
165        self.history.warning("Possible deadlock in fetch_task_to_run!")
166        return None
167
168    def fetch_alltasks_to_run(self):
169        """
170        Returns a list with all the tasks that can be submitted.
171        Empty list if not task has been found.
172        """
173        return [task for task in self if task.can_run]
174
175    @abc.abstractmethod
176    def setup(self, *args, **kwargs):
177        """Method called before submitting the calculations."""
178
179    def _setup(self, *args, **kwargs):
180        self.setup(*args, **kwargs)
181
182    def connect_signals(self):
183        """
184        Connect the signals within the work.
185        The |Work| is responsible for catching the important signals raised from
186        its task and raise new signals when some particular condition occurs.
187        """
188        for task in self:
189            dispatcher.connect(self.on_ok, signal=task.S_OK, sender=task)
190
191    def disconnect_signals(self):
192        """
193        Disable the signals within the work. This function reverses the process of `connect_signals`
194        """
195        for task in self:
196            try:
197                dispatcher.disconnect(self.on_ok, signal=task.S_OK, sender=task)
198            except dispatcher.errors.DispatcherKeyError as exc:
199                self.history.debug(str(exc))
200
201    @property
202    def all_ok(self):
203        return all(task.status == task.S_OK for task in self)
204
205    #@check_spectator
206    def on_ok(self, sender):
207        """
208        This callback is called when one task reaches status `S_OK`.
209        It executes on_all_ok when all tasks in self have reached `S_OK`.
210        """
211        self.history.debug("In on_ok with sender %s" % sender)
212
213        if self.all_ok:
214            if self.finalized:
215                return AttrDict(returncode=0, message="Work has been already finalized")
216            else:
217                # Set finalized here, because on_all_ok might change it (e.g. Relax + EOS in a single work)
218                self.finalized = True
219                try:
220                    results = AttrDict(**self.on_all_ok())
221                except Exception as exc:
222                    self.history.critical("on_all_ok raises %s" % str(exc))
223                    self.finalized = False
224                    raise
225
226                # Signal to possible observers that the `Work` reached S_OK
227                self.history.info("Work %s is finalized and broadcasts signal S_OK" % str(self))
228                if self._finalized:
229                    self.send_signal(self.S_OK)
230
231                return results
232
233        return AttrDict(returncode=1, message="Not all tasks are OK!")
234
235    #@check_spectator
236    def on_all_ok(self):
237        """
238        This method is called once the `Work` is completed i.e. when all tasks have reached status S_OK.
239        Subclasses should provide their own implementation
240
241        Returns:
242            Dictionary that must contain at least the following entries:
243                returncode: 0 on success.
244                message: a string that should provide a human-readable description of what has been performed.
245        """
246        return dict(returncode=0, message="Calling on_all_ok of the base class!")
247
248    def get_results(self, **kwargs):
249        """
250        Method called once the calculations are completed.
251        The base version returns a dictionary task_name: TaskResults for each task in self.
252        """
253        results = self.Results.from_node(self)
254        return results
255
256    def get_graphviz(self, engine="automatic", graph_attr=None, node_attr=None, edge_attr=None):
257        """
258        Generate task graph in the DOT language (only parents and children of this work).
259
260        Args:
261            engine: Layout command used. ['dot', 'neato', 'twopi', 'circo', 'fdp', 'sfdp', 'patchwork', 'osage']
262            graph_attr: Mapping of (attribute, value) pairs for the graph.
263            node_attr: Mapping of (attribute, value) pairs set for all nodes.
264            edge_attr: Mapping of (attribute, value) pairs set for all edges.
265
266        Returns: graphviz.Digraph <https://graphviz.readthedocs.io/en/stable/api.html#digraph>
267        """
268        from graphviz import Digraph
269        fg = Digraph("work", #filename="work_%s.gv" % os.path.basename(self.workdir),
270            engine="fdp" if engine == "automatic" else engine)
271
272        # Set graph attributes.
273        # https://www.graphviz.org/doc/info/
274        #fg.attr(label="%s@%s" % (self.__class__.__name__, self.relworkdir))
275        fg.attr(label=repr(self))
276        #fg.attr(fontcolor="white", bgcolor='purple:pink')
277        fg.attr(rankdir="LR", pagedir="BL")
278        #fg.attr(constraint="false", pack="true", packMode="clust")
279        fg.node_attr.update(color='lightblue2', style='filled')
280        #fg.node_attr.update(ranksep='equally')
281
282        # Add input attributes.
283        if graph_attr is not None:
284            fg.graph_attr.update(**graph_attr)
285        if node_attr is not None:
286            fg.node_attr.update(**node_attr)
287        if edge_attr is not None:
288            fg.edge_attr.update(**edge_attr)
289
290        def node_kwargs(node):
291            return dict(
292                #shape="circle",
293                color=node.color_hex,
294                label=(str(node) if not hasattr(node, "pos_str") else
295                    node.pos_str + "\n" + node.__class__.__name__),
296            )
297
298        edge_kwargs = dict(arrowType="vee", style="solid")
299        cluster_kwargs = dict(rankdir="LR", pagedir="BL", style="rounded", bgcolor="azure2")
300
301        # Build cluster with tasks in *this* work
302        cluster_name = "cluster%s" % self.name
303        with fg.subgraph(name=cluster_name) as wg:
304            wg.attr(**cluster_kwargs)
305            wg.attr(label="%s (%s)" % (self.__class__.__name__, self.name))
306            for task in self:
307                wg.node(task.name, **node_kwargs(task))
308                # Connect task to children
309                for child in task.get_children():
310                    # Test if child is in this cluster (self).
311                    myg = wg if child in self else fg
312                    myg.node(child.name, **node_kwargs(child))
313                    # Find file extensions required by this task
314                    i = [dep.node for dep in child.deps].index(task)
315                    edge_label = "+".join(child.deps[i].exts)
316                    myg.edge(task.name, child.name, label=edge_label, color=task.color_hex,
317                             **edge_kwargs)
318
319                # Connect task to parents
320                for parent in task.get_parents():
321                    # Test if parent is in this cluster (self).
322                    myg = wg if parent in self else fg
323                    myg.node(parent.name, **node_kwargs(parent))
324                    # Find file extensions required by this task
325                    i = [dep.node for dep in task.deps].index(parent)
326                    edge_label = "+".join(task.deps[i].exts)
327                    myg.edge(parent.name, task.name, label=edge_label, color=parent.color_hex,
328                             **edge_kwargs)
329
330        # Treat the case in which we have a work producing output for tasks in *this* work.
331        #for work in self.flow:
332        #    children = work.get_children()
333        #    if not children or all(child not in self for child in children):
334        #        continue
335        #    cluster_name = "cluster%s" % work.name
336        #    seen = set()
337        #    for child in children:
338        #        if child not in self: continue
339        #        # This is not needed, too much confusing
340        #        #fg.edge(cluster_name, child.name, color=work.color_hex, **edge_kwargs)
341        #        # Find file extensions required by work
342        #        i = [dep.node for dep in child.deps].index(work)
343        #        for ext in child.deps[i].exts:
344        #            out = "%s (%s)" % (ext, work.name)
345        #            fg.node(out)
346        #            fg.edge(out, child.name, **edge_kwargs)
347        #            key = (cluster_name, out)
348        #            if key not in seen:
349        #                fg.edge(cluster_name, out, color=work.color_hex, **edge_kwargs)
350        #                seen.add(key)
351
352        return fg
353
354
355class NodeContainer(metaclass=abc.ABCMeta):
356    """
357    Mixin classes for `Work` and `Flow` objects providing helper functions
358    to register tasks in the container. The helper function call the
359    `register` method of the container.
360    """
361    # Abstract protocol for containers
362
363    @abc.abstractmethod
364    def register_task(self, *args, **kwargs):
365        """
366        Register a task in the container.
367        """
368        # TODO: shall flow.register_task return a Task or a Work?
369
370    # Helper functions to register Task subclasses.
371    def register_scf_task(self, *args, **kwargs):
372        """Register a Scf task."""
373        kwargs["task_class"] = ScfTask
374        return self.register_task(*args, **kwargs)
375
376    def register_collinear_then_noncollinear_scf_task(self, *args, **kwargs):
377        """Register a Scf task that perform a SCF run first with nsppol = 2 and then nspinor = 2"""
378        kwargs["task_class"] = CollinearThenNonCollinearScfTask
379        return self.register_task(*args, **kwargs)
380
381    def register_nscf_task(self, *args, **kwargs):
382        """Register a nscf task."""
383        kwargs["task_class"] = NscfTask
384        task = self.register_task(*args, **kwargs)
385
386        # Make sure parent producing DEN file is given
387        if task.is_work: task = task[-1]
388        den_parent = task.find_parent_with_ext("DEN")
389        if den_parent is None:
390            raise ValueError("NSCF task %s\nrequires parent producing DEN file!" % repr(task))
391
392        if task.input.get("usekden", 0) == 1:
393            # Meta-GGA calculation --> Add KDEN if not explicitly given.
394            # Assuming prtkden already set to 1
395            # TODO: Abinit should automatically set it to 1 if usekden --> I'm not gonna fix the input at this level
396            kden_parent = task.find_parent_with_ext("KDEN")
397            if kden_parent is None:
398                task.add_deps({den_parent: "KDEN"})
399
400        return task
401
402    def register_relax_task(self, *args, **kwargs):
403        """Register a task for structural optimization."""
404        kwargs["task_class"] = RelaxTask
405        return self.register_task(*args, **kwargs)
406
407    def register_phonon_task(self, *args, **kwargs):
408        """Register a phonon task."""
409        kwargs["task_class"] = PhononTask
410        return self.register_task(*args, **kwargs)
411
412    def register_elastic_task(self, *args, **kwargs):
413        """Register an elastic task."""
414        kwargs["task_class"] = ElasticTask
415        return self.register_task(*args, **kwargs)
416
417    def register_ddk_task(self, *args, **kwargs):
418        """Register a DDK task."""
419        kwargs["task_class"] = DdkTask
420        return self.register_task(*args, **kwargs)
421
422    def register_effmass_task(self, *args, **kwargs):
423        """Register a effective mass task."""
424        kwargs["task_class"] = EffMassTask
425        # FIXME: Hack to run it in sequential because effmass task does not support parallelism.
426        kwargs.update({"manager": TaskManager.from_user_config().new_with_fixed_mpi_omp(1, 1)})
427        return self.register_task(*args, **kwargs)
428
429    def register_scr_task(self, *args, **kwargs):
430        """Register a screening task."""
431        kwargs["task_class"] = ScrTask
432        return self.register_task(*args, **kwargs)
433
434    def register_sigma_task(self, *args, **kwargs):
435        """Register a sigma task."""
436        kwargs["task_class"] = SigmaTask
437        return self.register_task(*args, **kwargs)
438
439    def register_dde_task(self, *args, **kwargs):
440        """Register a Dde task."""
441        kwargs["task_class"] = DdeTask
442        return self.register_task(*args, **kwargs)
443
444    def register_dte_task(self, *args, **kwargs):
445        """Register a Dte task."""
446        kwargs["task_class"] = DteTask
447        return self.register_task(*args, **kwargs)
448
449    def register_bec_task(self, *args, **kwargs):
450        """Register a BEC task."""
451        kwargs["task_class"] = BecTask
452        return self.register_task(*args, **kwargs)
453
454    def register_bse_task(self, *args, **kwargs):
455        """Register a Bethe-Salpeter task."""
456        kwargs["task_class"] = BseTask
457        return self.register_task(*args, **kwargs)
458
459    def register_eph_task(self, *args, **kwargs):
460        """Register an electron-phonon task."""
461        kwargs["task_class"] = EphTask
462        eph_inp = args[0]
463        seq_manager = TaskManager.from_user_config().new_with_fixed_mpi_omp(1, 1)
464        if eph_inp.get("eph_frohlichm", 0) != 0 or abs(eph_inp.get("eph_task", 0)) == 15:
465            # FIXME: Hack to run task in sequential if calculation does not support MPI with nprocs > 1.
466            kwargs.update({"manager": seq_manager})
467
468        return self.register_task(*args, **kwargs)
469
470    def walknset_vars(self, task_class=None, *args, **kwargs):
471        """
472        Set the values of the ABINIT variables in the input files of the nodes
473
474        Args:
475            task_class: If not None, only the input files of the tasks belonging
476                to class `task_class` are modified.
477
478        Example:
479
480            flow.walknset_vars(ecut=10, kptopt=4)
481        """
482        def change_task(task):
483            if task_class is not None and task.__class__ is not task_class: return False
484            return True
485
486        if self.is_work:
487            for task in self:
488                if not change_task(task): continue
489                task.set_vars(*args, **kwargs)
490
491        elif self.is_flow:
492            for task in self.iflat_tasks():
493                if not change_task(task): continue
494                task.set_vars(*args, **kwargs)
495
496        else:
497            raise TypeError("Don't know how to set variables for object class %s" % self.__class__.__name__)
498
499
500class Work(BaseWork, NodeContainer):
501    """
502    A Work is a list of (possibly connected) tasks.
503
504    .. rubric:: Inheritance Diagram
505    .. inheritance-diagram:: Work
506    """
507    def __init__(self, workdir=None, manager=None):
508        """
509        Args:
510            workdir: Path to the working directory.
511            manager: |TaskManager| object.
512        """
513        super().__init__()
514
515        self._tasks = []
516
517        if workdir is not None:
518            self.set_workdir(workdir)
519
520        if manager is not None:
521            self.set_manager(manager)
522
523    def set_manager(self, manager):
524        """Set the |TaskManager| to use to launch the |Task|."""
525        self.manager = manager.deepcopy()
526        for task in self:
527            task.set_manager(manager)
528
529    @property
530    def flow(self):
531        """The flow containing this |Work|."""
532        return self._flow
533
534    def set_flow(self, flow):
535        """Set the flow associated to this |Work|."""
536        if not hasattr(self, "_flow"):
537            self._flow = flow
538        else:
539            if self._flow != flow:
540                raise ValueError("self._flow != flow")
541
542    @lazy_property
543    def pos(self):
544        """The position of self in the |Flow|"""
545        for i, work in enumerate(self.flow):
546            if self == work:
547                return i
548        raise ValueError("Cannot find the position of %s in flow %s" % (self, self.flow))
549
550    @property
551    def pos_str(self):
552        """String representation of self.pos"""
553        return "w" + str(self.pos)
554
555    def set_workdir(self, workdir, chroot=False):
556        """Set the working directory. Cannot be set more than once unless chroot is True"""
557        if not chroot and hasattr(self, "workdir") and self.workdir != workdir:
558            raise ValueError("self.workdir != workdir: %s, %s" % (self.workdir,  workdir))
559
560        self.workdir = os.path.abspath(workdir)
561
562        # Directories with (input|output|temporary) data.
563        # The work will use these directories to connect
564        # itself to other works and/or to produce new data
565        # that will be used by its children.
566        self.indir = Directory(os.path.join(self.workdir, "indata"))
567        self.outdir = Directory(os.path.join(self.workdir, "outdata"))
568        self.tmpdir = Directory(os.path.join(self.workdir, "tmpdata"))
569        self.wdir = Directory(self.workdir)
570
571    def chroot(self, new_workdir):
572        self.set_workdir(new_workdir, chroot=True)
573
574        for i, task in enumerate(self):
575            new_tdir = os.path.join(self.workdir, "t" + str(i))
576            task.set_workdir(new_tdir, chroot=True)
577
578    def __len__(self):
579        return len(self._tasks)
580
581    def __iter__(self):
582        return self._tasks.__iter__()
583
584    def __getitem__(self, slice):
585        return self._tasks[slice]
586
587    def chunks(self, chunk_size):
588        """Yield successive chunks of tasks of lenght chunk_size."""
589        for tasks in chunks(self, chunk_size):
590            yield tasks
591
592    def ipath_from_ext(self, ext):
593        """
594        Returns the path of the output file with extension ext.
595        Use it when the file does not exist yet.
596        """
597        return self.indir.path_in("in_" + ext)
598
599    def opath_from_ext(self, ext):
600        """
601        Returns the path of the output file with extension ext.
602        Use it when the file does not exist yet.
603        """
604        return self.outdir.path_in("out_" + ext)
605
606    @property
607    def processes(self):
608        return [task.process for task in self]
609
610    @property
611    def all_done(self):
612        """True if all the |Task| objects in the |Work| are done."""
613        return all(task.status >= task.S_DONE for task in self)
614
615    @property
616    def isnc(self):
617        """True if norm-conserving calculation."""
618        return all(task.isnc for task in self)
619
620    @property
621    def ispaw(self):
622        """True if PAW calculation."""
623        return all(task.ispaw for task in self)
624
625    @property
626    def status_counter(self):
627        """
628        Returns a `Counter` object that counts the number of task with
629        given status (use the string representation of the status as key).
630        """
631        counter = collections.Counter()
632
633        for task in self:
634            counter[str(task.status)] += 1
635
636        return counter
637
638    def allocate(self, manager=None):
639        """
640        This function is called once we have completed the initialization
641        of the |Work|. It sets the manager of each task (if not already done)
642        and defines the working directories of the tasks.
643
644        Args:
645            manager: |TaskManager| object or None
646        """
647        for i, task in enumerate(self):
648
649            if not hasattr(task, "manager"):
650                # Set the manager
651                # Use the one provided in input else the one of the work/flow.
652                if manager is not None:
653                    task.set_manager(manager)
654                else:
655                    # Look first in work and then in the flow.
656                    if hasattr(self, "manager"):
657                        task.set_manager(self.manager)
658                    else:
659                        task.set_manager(self.flow.manager)
660
661            task_workdir = os.path.join(self.workdir, "t" + str(i))
662
663            if not hasattr(task, "workdir"):
664                task.set_workdir(task_workdir)
665            else:
666                if task.workdir != task_workdir:
667                    raise ValueError("task.workdir != task_workdir: %s, %s" % (task.workdir, task_workdir))
668
669    def register(self, obj, deps=None, required_files=None, manager=None, task_class=None):
670        """
671        Registers a new |Task| and add it to the internal list, taking into account possible dependencies.
672
673        Args:
674            obj: |AbinitInput| instance or |Task| object.
675            deps: Dictionary specifying the dependency of this node or list of dependencies
676                  None means that this obj has no dependency.
677            required_files: List of strings with the path of the files used by the task.
678                Note that the files must exist when the task is registered.
679                Use the standard approach based on Works, Tasks and deps
680                if the files will be produced in the future.
681            manager:
682                The |TaskManager| responsible for the submission of the task. If manager is None, we use
683                the `TaskManager` specified during the creation of the |Work|.
684            task_class: Task subclass to instantiate. Default: :class:`AbinitTask`
685
686        Returns: |Task| object
687        """
688        task_workdir = None
689        if hasattr(self, "workdir"):
690            task_workdir = os.path.join(self.workdir, "t" + str(len(self)))
691
692        if isinstance(obj, Task):
693            task = obj
694
695        else:
696            # Set the class
697            if task_class is None:
698                task_class = AbinitTask
699
700            task = task_class.from_input(obj, task_workdir, manager)
701
702        self._tasks.append(task)
703
704        # Handle possible dependencies given either as dict or list.
705        if deps is not None:
706            if hasattr(deps, "items"):
707                deps = [Dependency(node, exts) for node, exts in deps.items()]
708            task.add_deps(deps)
709
710        # Handle possible dependencies.
711        if required_files is not None:
712            task.add_required_files(required_files)
713
714        return task
715
716    # Needed by NodeContainer
717    register_task = register
718
719    def path_in_workdir(self, filename):
720        """Create the absolute path of filename in the working directory."""
721        return os.path.join(self.workdir, filename)
722
723    def setup(self, *args, **kwargs):
724        """
725        Method called before running the calculations.
726        The default implementation is empty.
727        """
728
729    def build(self, *args, **kwargs):
730        """Creates the top level directory."""
731        # Create the directories of the work.
732        self.indir.makedirs()
733        self.outdir.makedirs()
734        self.tmpdir.makedirs()
735
736        # Build dirs and files of each task.
737        for task in self:
738            task.build(*args, **kwargs)
739
740        # Connect signals within the work.
741        self.connect_signals()
742
743    @property
744    def status(self):
745        """
746        Returns the status of the work i.e. the minimum of the status of the tasks.
747        """
748        return self.get_all_status(only_min=True)
749
750    def get_all_status(self, only_min=False):
751        """
752        Returns a list with the status of the tasks in self.
753
754        Args:
755            only_min: If True, the minimum of the status is returned.
756        """
757        if len(self) == 0:
758            # The work will be created in the future.
759            if only_min:
760                return self.S_INIT
761            else:
762                return [self.S_INIT]
763
764        self.check_status()
765        status_list = [task.status for task in self]
766
767        if only_min:
768            return min(status_list)
769        else:
770            return status_list
771
772    def check_status(self):
773        """Check the status of the tasks."""
774        # Recompute the status of the tasks
775        # Ignore OK and LOCKED tasks.
776        for task in self:
777            if task.status in (task.S_OK, task.S_LOCKED): continue
778            task.check_status()
779
780        # Take into account possible dependencies. Use a list instead of generators
781        for task in self:
782            if task.status == task.S_LOCKED: continue
783            if task.status < task.S_SUB and all(status == task.S_OK for status in task.deps_status):
784                task.set_status(task.S_READY, "Status set to Ready")
785
786    def rmtree(self, exclude_wildcard=""):
787        """
788        Remove all files and directories in the working directory
789
790        Args:
791            exclude_wildcard: Optional string with regular expressions separated by `|`.
792                Files matching one of the regular expressions will be preserved.
793                example: exclude_wildard="*.nc|*.txt" preserves all the files
794                whose extension is in ["nc", "txt"].
795        """
796        if not exclude_wildcard:
797            shutil.rmtree(self.workdir)
798
799        else:
800            w = WildCard(exclude_wildcard)
801            for dirpath, dirnames, filenames in os.walk(self.workdir):
802                for fname in filenames:
803                    path = os.path.join(dirpath, fname)
804                    if not w.match(fname):
805                        os.remove(path)
806
807    def rm_indatadir(self):
808        """Remove all the indata directories."""
809        for task in self:
810            task.rm_indatadir()
811
812    def rm_outdatadir(self):
813        """Remove all the indata directories."""
814        for task in self:
815            task.rm_outatadir()
816
817    def rm_tmpdatadir(self):
818        """Remove all the tmpdata directories."""
819        for task in self:
820            task.rm_tmpdatadir()
821
822    def move(self, dest, isabspath=False):
823        """
824        Recursively move self.workdir to another location. This is similar to the Unix "mv" command.
825        The destination path must not already exist. If the destination already exists
826        but is not a directory, it may be overwritten depending on os.rename() semantics.
827
828        Be default, dest is located in the parent directory of self.workdir, use isabspath=True
829        to specify an absolute path.
830        """
831        if not isabspath:
832            dest = os.path.join(os.path.dirname(self.workdir), dest)
833
834        shutil.move(self.workdir, dest)
835
836    def submit_tasks(self, wait=False):
837        """
838        Submits the task in self and wait.
839        TODO: change name.
840        """
841        for task in self:
842            task.start()
843
844        if wait:
845            for task in self: task.wait()
846
847    def start(self, *args, **kwargs):
848        """
849        Start the work. Calls build and _setup first, then submit the tasks.
850        Non-blocking call unless wait is set to True
851        """
852        wait = kwargs.pop("wait", False)
853
854        # Initial setup
855        self._setup(*args, **kwargs)
856
857        # Build dirs and files.
858        self.build(*args, **kwargs)
859
860        # Submit tasks (does not block)
861        self.submit_tasks(wait=wait)
862
863    def read_etotals(self, unit="Ha"):
864        """
865        Reads the total energy from the GSR file produced by the task.
866
867        Return a numpy array with the total energies in Hartree
868        The array element is set to np.inf if an exception is raised while reading the GSR file.
869        """
870        if not self.all_done:
871            raise self.Error("Some task is still in running/submitted state")
872
873        etotals = []
874        for task in self:
875            # Open the GSR file and read etotal (Hartree)
876            gsr_path = task.outdir.has_abiext("GSR")
877            etot = np.inf
878            if gsr_path:
879                with ETSF_Reader(gsr_path) as r:
880                    etot = r.read_value("etotal")
881
882            etotals.append(etot)
883
884        return EnergyArray(etotals, "Ha").to(unit)
885
886    def parse_timers(self):
887        """
888        Parse the TIMER section reported in the ABINIT output files.
889
890        Returns:
891            :class:`AbinitTimerParser` object
892        """
893        filenames = list(filter(os.path.exists, [task.output_file.path for task in self]))
894
895        parser = AbinitTimerParser()
896        parser.parse(filenames)
897
898        return parser
899
900
901class BandStructureWork(Work):
902    """
903    Work for band structure calculations.
904
905    .. rubric:: Inheritance Diagram
906    .. inheritance-diagram:: BandStructureWork
907    """
908
909    def __init__(self, scf_input, nscf_input, dos_inputs=None, workdir=None, manager=None):
910        """
911        Args:
912            scf_input: Input for the SCF run
913            nscf_input: Input for the NSCF run defining the band structure calculation.
914            dos_inputs: Input(s) for the DOS. DOS is computed only if dos_inputs is not None.
915            workdir: Working directory.
916            manager: |TaskManager| object.
917        """
918        super().__init__(workdir=workdir, manager=manager)
919
920        # Register the GS-SCF run.
921        self.scf_task = self.register_scf_task(scf_input)
922
923        # Register the NSCF run and its dependency.
924        self.nscf_task = self.register_nscf_task(nscf_input, deps={self.scf_task: "DEN"})
925
926        # Add DOS computation(s) if requested.
927        self.dos_tasks = []
928        if dos_inputs is not None:
929            if not isinstance(dos_inputs, (list, tuple)):
930                dos_inputs = [dos_inputs]
931
932            for dos_input in dos_inputs:
933                dos_task = self.register_nscf_task(dos_input, deps={self.scf_task: "DEN"})
934                self.dos_tasks.append(dos_task)
935
936    def plot_ebands(self, **kwargs):
937        """
938        Plot the band structure. kwargs are passed to the plot method of |ElectronBands|.
939
940        Return: |matplotlib-Figure|
941        """
942        with self.nscf_task.open_gsr() as gsr:
943            return gsr.ebands.plot(**kwargs)
944
945    def plot_ebands_with_edos(self, dos_pos=0, method="gaussian", step=0.01, width=0.1, **kwargs):
946        """
947        Plot the band structure and the DOS.
948
949        Args:
950            dos_pos: Index of the task from which the DOS should be obtained (note: 0 refers to the first DOS task).
951            method: String defining the method for the computation of the DOS.
952            step: Energy step (eV) of the linear mesh.
953            width: Standard deviation (eV) of the gaussian.
954            kwargs: Keyword arguments passed to `plot_with_edos` method to customize the plot.
955
956        Return: |matplotlib-Figure|
957        """
958        with self.nscf_task.open_gsr() as gsr:
959            gs_ebands = gsr.ebands
960
961        with self.dos_tasks[dos_pos].open_gsr() as gsr:
962            dos_ebands = gsr.ebands
963
964        edos = dos_ebands.get_edos(method=method, step=step, width=width)
965        return gs_ebands.plot_with_edos(edos, **kwargs)
966
967    def plot_edoses(self, dos_pos=None, method="gaussian", step=0.01, width=0.1, **kwargs):
968        """
969        Plot the band structure and the DOS.
970
971        Args:
972            dos_pos: Index of the task from which the DOS should be obtained.
973                     None is all DOSes should be displayed. Accepts integer or list of integers.
974            method: String defining the method for the computation of the DOS.
975            step: Energy step (eV) of the linear mesh.
976            width: Standard deviation (eV) of the gaussian.
977            kwargs: Keyword arguments passed to `plot` method to customize the plot.
978
979        Return: |matplotlib-Figure|
980        """
981        if dos_pos is not None and not isinstance(dos_pos, (list, tuple)): dos_pos = [dos_pos]
982
983        from abipy.electrons.ebands import ElectronDosPlotter
984        plotter = ElectronDosPlotter()
985        for i, task in enumerate(self.dos_tasks):
986            if dos_pos is not None and i not in dos_pos: continue
987            with task.open_gsr() as gsr:
988                edos = gsr.ebands.get_edos(method=method, step=step, width=width)
989                ngkpt = task.get_inpvar("ngkpt")
990                plotter.add_edos("ngkpt %s" % str(ngkpt), edos)
991
992        return plotter.combiplot(**kwargs)
993
994
995class RelaxWork(Work):
996    """
997    Work for structural relaxations. The first task relaxes the atomic position
998    while keeping the unit cell parameters fixed. The second task uses the final
999    structure to perform a structural relaxation in which both the atomic positions
1000    and the lattice parameters are optimized.
1001
1002    .. rubric:: Inheritance Diagram
1003    .. inheritance-diagram:: RelaxWork
1004    """
1005    def __init__(self, ion_input, ioncell_input, workdir=None, manager=None, target_dilatmx=None):
1006        """
1007        Args:
1008            ion_input: Input for the relaxation of the ions (cell is fixed)
1009            ioncell_input: Input for the relaxation of the ions and the unit cell.
1010            workdir: Working directory.
1011            manager: |TaskManager| object.
1012        """
1013        super().__init__(workdir=workdir, manager=manager)
1014
1015        self.ion_task = self.register_relax_task(ion_input)
1016
1017        # Note:
1018        #   1) It would be nice to restart from the WFK file but ABINIT crashes due to the
1019        #      different unit cell parameters if paral_kgb == 1
1020        #paral_kgb = ion_input[0]["paral_kgb"]
1021        #if paral_kgb == 1:
1022
1023        #deps = {self.ion_task: "WFK"}  # --> FIXME: Problem in rwwf
1024        #deps = {self.ion_task: "DEN"}
1025        deps = None
1026
1027        self.ioncell_task = self.register_relax_task(ioncell_input, deps=deps)
1028
1029        # Lock ioncell_task as ion_task should communicate to ioncell_task that
1030        # the calculation is OK and pass the final structure.
1031        self.ioncell_task.lock(source_node=self)
1032        self.transfer_done = False
1033
1034        self.target_dilatmx = target_dilatmx
1035
1036    #@check_spectator
1037    def on_ok(self, sender):
1038        """
1039        This callback is called when one task reaches status S_OK.
1040        If sender == self.ion_task, we update the initial structure
1041        used by self.ioncell_task and we unlock it so that the job can be submitted.
1042        """
1043        self.history.debug("In on_ok with sender %s" % sender)
1044
1045        if sender == self.ion_task and not self.transfer_done:
1046            # Get the relaxed structure from ion_task
1047            ion_structure = self.ion_task.get_final_structure()
1048
1049            # Transfer it to the ioncell task (we do it only once).
1050            self.ioncell_task._change_structure(ion_structure)
1051            self.transfer_done = True
1052
1053            # Unlock ioncell_task so that we can submit it.
1054            self.ioncell_task.unlock(source_node=self)
1055
1056        elif sender == self.ioncell_task and self.target_dilatmx:
1057            actual_dilatmx = self.ioncell_task.get_inpvar('dilatmx', 1.)
1058            if self.target_dilatmx < actual_dilatmx:
1059                self.ioncell_task.reduce_dilatmx(target=self.target_dilatmx)
1060                self.history.info('Converging dilatmx. Value reduce from {} to {}.'
1061                            .format(actual_dilatmx, self.ioncell_task.get_inpvar('dilatmx')))
1062                self.ioncell_task.restart()
1063
1064        return super().on_ok(sender)
1065
1066    def plot_ion_relaxation(self, **kwargs):
1067        """
1068        Plot the history of the ion-cell relaxation.
1069        kwargs are passed to the plot method of |HistFile|
1070
1071        Return: |matplotlib-Figure| or None if hist file is not found.
1072        """
1073        with self.ion_task.open_hist() as hist:
1074            return hist.plot(**kwargs) if hist else None
1075
1076    def plot_ioncell_relaxation(self, **kwargs):
1077        """
1078        Plot the history of the ion-cell relaxation.
1079        kwargs are passed to the plot method of |HistFile|
1080
1081        Return: |matplotlib-Figure| or None if hist file is not found.
1082        """
1083        with self.ioncell_task.open_hist() as hist:
1084            return hist.plot(**kwargs) if hist else None
1085
1086
1087class G0W0Work(Work):
1088    """
1089    Work for general G0W0 calculations.
1090    All input can be either single inputs or lists of inputs
1091
1092    .. rubric:: Inheritance Diagram
1093    .. inheritance-diagram:: G0W0Work
1094    """
1095    def __init__(self, scf_inputs, nscf_inputs, scr_inputs, sigma_inputs,
1096                 workdir=None, manager=None):
1097        """
1098        Args:
1099            scf_inputs: Input(s) for the SCF run, if it is a list add all but only link
1100                to the last input (used for convergence studies on the KS band gap)
1101            nscf_inputs: Input(s) for the NSCF run, if it is a list add all but only
1102                link to the last (i.e. addditiona DOS and BANDS)
1103            scr_inputs: Input for the screening run
1104            sigma_inputs: List of |AbinitInput| for the self-energy run.
1105                if scr and sigma are lists of the same length, every sigma gets its own screening.
1106                if there is only one screening all sigma inputs are linked to this one
1107            workdir: Working directory of the calculation.
1108            manager: |TaskManager| object.
1109        """
1110        super().__init__(workdir=workdir, manager=manager)
1111
1112        spread_scr = (isinstance(sigma_inputs, (list, tuple)) and
1113                      isinstance(scr_inputs, (list, tuple)) and
1114                      len(sigma_inputs) == len(scr_inputs))
1115        #print("spread_scr", spread_scr)
1116
1117        self.sigma_tasks = []
1118
1119        # Register the GS-SCF run.
1120        # register all scf_inputs but link the nscf only the last scf in the list
1121        # multiple scf_inputs can be provided to perform convergence studies
1122        if isinstance(scf_inputs, (list, tuple)):
1123            for scf_input in scf_inputs:
1124                self.scf_task = self.register_scf_task(scf_input)
1125        else:
1126            self.scf_task = self.register_scf_task(scf_inputs)
1127
1128        # Register the NSCF run (s).
1129        if isinstance(nscf_inputs, (list, tuple)):
1130            for nscf_input in nscf_inputs:
1131                self.nscf_task = nscf_task = self.register_nscf_task(nscf_input, deps={self.scf_task: "DEN"})
1132        else:
1133            self.nscf_task = nscf_task = self.register_nscf_task(nscf_inputs, deps={self.scf_task: "DEN"})
1134
1135        # Register the SCR and SIGMA run(s).
1136        if spread_scr:
1137            for scr_input, sigma_input in zip(scr_inputs, sigma_inputs):
1138                scr_task = self.register_scr_task(scr_input, deps={nscf_task: "WFK"})
1139                sigma_task = self.register_sigma_task(sigma_input, deps={nscf_task: "WFK", scr_task: "SCR"})
1140                self.sigma_tasks.append(sigma_task)
1141        else:
1142            # Sigma work(s) connected to the same screening.
1143            scr_task = self.register_scr_task(scr_inputs, deps={nscf_task: "WFK"})
1144            if isinstance(sigma_inputs, (list, tuple)):
1145                for inp in sigma_inputs:
1146                    task = self.register_sigma_task(inp, deps={nscf_task: "WFK", scr_task: "SCR"})
1147                    self.sigma_tasks.append(task)
1148            else:
1149                task = self.register_sigma_task(sigma_inputs, deps={nscf_task: "WFK", scr_task: "SCR"})
1150                self.sigma_tasks.append(task)
1151
1152
1153class SigmaConvWork(Work):
1154    """
1155    Work for self-energy convergence studies.
1156
1157    .. rubric:: Inheritance Diagram
1158    .. inheritance-diagram:: SigmaConvWork
1159    """
1160    def __init__(self, wfk_node, scr_node, sigma_inputs, workdir=None, manager=None):
1161        """
1162        Args:
1163            wfk_node: The node who has produced the WFK file or filepath pointing to the WFK file.
1164            scr_node: The node who has produced the SCR file or filepath pointing to the SCR file.
1165            sigma_inputs: List of |AbinitInput| for the self-energy runs.
1166            workdir: Working directory of the calculation.
1167            manager: |TaskManager| object.
1168        """
1169        # Cast to node instances.
1170        wfk_node, scr_node = Node.as_node(wfk_node), Node.as_node(scr_node)
1171
1172        super().__init__(workdir=workdir, manager=manager)
1173
1174        # Register the SIGMA runs.
1175        if not isinstance(sigma_inputs, (list, tuple)):
1176            sigma_inputs = [sigma_inputs]
1177
1178        for sigma_input in sigma_inputs:
1179            self.register_sigma_task(sigma_input, deps={wfk_node: "WFK", scr_node: "SCR"})
1180
1181
1182class BseMdfWork(Work):
1183    """
1184    Work for simple BSE calculations in which the self-energy corrections
1185    are approximated by the scissors operator and the screening is modeled
1186    with the model dielectric function.
1187
1188    .. rubric:: Inheritance Diagram
1189    .. inheritance-diagram:: BseMdfWork
1190    """
1191    def __init__(self, scf_input, nscf_input, bse_inputs, workdir=None, manager=None):
1192        """
1193        Args:
1194            scf_input: Input for the SCF run.
1195            nscf_input: Input for the NSCF run.
1196            bse_inputs: List of Inputs for the BSE run.
1197            workdir: Working directory of the calculation.
1198            manager: |TaskManager|
1199        """
1200        super().__init__(workdir=workdir, manager=manager)
1201
1202        # Register the GS-SCF run.
1203        self.scf_task = self.register_scf_task(scf_input)
1204
1205        # Construct the input for the NSCF run.
1206        self.nscf_task = self.register_nscf_task(nscf_input, deps={self.scf_task: "DEN"})
1207
1208        # Construct the input(s) for the BSE run.
1209        if not isinstance(bse_inputs, (list, tuple)):
1210            bse_inputs = [bse_inputs]
1211
1212        for bse_input in bse_inputs:
1213            self.register_bse_task(bse_input, deps={self.nscf_task: "WFK"})
1214
1215    def get_mdf_robot(self):
1216        """Builds and returns a :class:`MdfRobot` for analyzing the results in the MDF files."""
1217        from abilab.robots import MdfRobot
1218        robot = MdfRobot()
1219        for task in self[2:]:
1220            mdf_path = task.outdir.has_abiext(robot.EXT)
1221            if mdf_path:
1222                robot.add_file(str(task), mdf_path)
1223        return robot
1224
1225
1226class QptdmWork(Work):
1227    """
1228    This work parallelizes the calculation of the q-points of the screening.
1229    It also provides the callback `on_all_ok` that calls mrgscr to merge
1230    all the partial screening files produced.
1231
1232    .. rubric:: Inheritance Diagram
1233    .. inheritance-diagram:: QptdmWork
1234    """
1235    def create_tasks(self, wfk_file, scr_input):
1236        """
1237        Create the SCR tasks and register them in self.
1238
1239        Args:
1240            wfk_file: Path to the ABINIT WFK file to use for the computation of the screening.
1241            scr_input: Input for the screening calculation.
1242        """
1243        assert len(self) == 0
1244        wfk_file = self.wfk_file = os.path.abspath(wfk_file)
1245
1246        # Build a temporary work in the tmpdir that will use a shell manager
1247        # to run ABINIT in order to get the list of q-points for the screening.
1248        shell_manager = self.manager.to_shell_manager(mpi_procs=1)
1249
1250        w = Work(workdir=self.tmpdir.path_join("_qptdm_run"), manager=shell_manager)
1251
1252        fake_input = scr_input.deepcopy()
1253        fake_task = w.register(fake_input)
1254        w.allocate()
1255        w.build()
1256
1257        # Create the symbolic link and add the magic value
1258        # nqpdm = -1 to the input to get the list of q-points.
1259        fake_task.inlink_file(wfk_file)
1260        fake_task.set_vars({"nqptdm": -1})
1261        fake_task.start_and_wait()
1262
1263        # Parse the section with the q-points
1264        with NetcdfReader(fake_task.outdir.has_abiext("qptdms.nc")) as reader:
1265            qpoints = reader.read_value("reduced_coordinates_of_kpoints")
1266
1267        # Now we can register the task for the different q-points
1268        for qpoint in qpoints:
1269            qptdm_input = scr_input.deepcopy()
1270            qptdm_input.set_vars(nqptdm=1, qptdm=qpoint)
1271            new_task = self.register_scr_task(qptdm_input, manager=self.manager)
1272            # Add the garbage collector.
1273            if self.flow.gc is not None:
1274                new_task.set_gc(self.flow.gc)
1275
1276        self.allocate()
1277
1278    def merge_scrfiles(self, remove_scrfiles=True):
1279        """
1280        This method is called when all the q-points have been computed.
1281        It runs `mrgscr` in sequential on the local machine to produce
1282        the final SCR file in the outdir of the `Work`.
1283        If remove_scrfiles is True, the partial SCR files are removed after the merge.
1284        """
1285        scr_files = list(filter(None, [task.outdir.has_abiext("SCR") for task in self]))
1286
1287        self.history.info("Will call mrgscr to merge %s SCR files:\n" % len(scr_files))
1288        assert len(scr_files) == len(self)
1289
1290        mrgscr = wrappers.Mrgscr(manager=self[0].manager, verbose=1)
1291        final_scr = mrgscr.merge_qpoints(self.outdir.path, scr_files, out_prefix="out")
1292
1293        if remove_scrfiles:
1294            for scr_file in scr_files:
1295                try:
1296                    os.remove(scr_file)
1297                except IOError:
1298                    pass
1299
1300        return final_scr
1301
1302    #@check_spectator
1303    def on_all_ok(self):
1304        """
1305        This method is called when all the q-points have been computed.
1306        It runs `mrgscr` in sequential on the local machine to produce
1307        the final SCR file in the outdir of the `Work`.
1308        """
1309        final_scr = self.merge_scrfiles()
1310        return self.Results(node=self, returncode=0, message="mrgscr done", final_scr=final_scr)
1311
1312# TODO: MergeDdb --> DfptWork(Work) postpone it because it may break pickle.
1313
1314
1315class MergeDdb(object):
1316    """
1317    Mixin class for Works that have to merge the DDB files produced by the tasks.
1318    """
1319
1320    def add_becs_from_scf_task(self, scf_task, ddk_tolerance, ph_tolerance):
1321        """
1322        Build tasks for the computation of Born effective charges and add them to the work.
1323
1324        Args:
1325            scf_task: |ScfTask| object.
1326            ddk_tolerance: dict {"varname": value} with the tolerance used in the DDK run. None to use AbiPy default.
1327            ph_tolerance: dict {"varname": value} with the tolerance used in the phonon run. None to use AbiPy default.
1328
1329        Return: (ddk_tasks, bec_tasks)
1330        """
1331        if not isinstance(scf_task, ScfTask):
1332            raise TypeError("task `%s` does not inherit from ScfTask" % scf_task)
1333
1334        # DDK calculations (self-consistent to get electric field).
1335        multi_ddk = scf_task.input.make_ddk_inputs(tolerance=ddk_tolerance)
1336
1337        ddk_tasks = []
1338        for ddk_inp in multi_ddk:
1339            ddk_task = self.register_ddk_task(ddk_inp, deps={scf_task: "WFK"})
1340            ddk_tasks.append(ddk_task)
1341
1342        # Build the list of inputs for electric field perturbation and phonons
1343        # Each BEC task is connected to all the previous DDK task and to the scf_task.
1344        bec_deps = {ddk_task: "DDK" for ddk_task in ddk_tasks}
1345        bec_deps.update({scf_task: "WFK"})
1346
1347        bec_inputs = scf_task.input.make_bec_inputs(tolerance=ph_tolerance)
1348        bec_tasks = []
1349        for bec_inp in bec_inputs:
1350            bec_task = self.register_bec_task(bec_inp, deps=bec_deps)
1351            bec_tasks.append(bec_task)
1352
1353        return ddk_tasks, bec_tasks
1354
1355    def merge_ddb_files(self, delete_source_ddbs=False, only_dfpt_tasks=True,
1356                        exclude_tasks=None, include_tasks=None):
1357        """
1358        This method is called when all the q-points have been computed.
1359        It runs `mrgddb` in sequential on the local machine to produce
1360        the final DDB file in the outdir of the `Work`.
1361
1362        Args:
1363            delete_source_ddbs: True if input DDB should be removed once final DDB is created.
1364            only_dfpt_tasks: False to merge all DDB files produced by the tasks of the work
1365                Useful e.g. for finite stress corrections in which the stress in the
1366                initial configuration should be merged in the final DDB.
1367            exclude_tasks: List of tasks that should be excluded when merging the partial DDB files.
1368            include_tasks: List of tasks that should be included when merging the partial DDB files.
1369                Mutually exclusive with exclude_tasks.
1370
1371        Returns:
1372            path to the output DDB file
1373        """
1374        if exclude_tasks:
1375            my_tasks = [task for task in self if task not in exclude_tasks]
1376        elif include_tasks:
1377            my_tasks = [task for task in self if task in include_tasks]
1378        else:
1379            my_tasks = [task for task in self]
1380
1381        if only_dfpt_tasks:
1382            ddb_files = list(filter(None, [task.outdir.has_abiext("DDB") for task in my_tasks
1383                                    if isinstance(task, DfptTask)]))
1384        else:
1385            ddb_files = list(filter(None, [task.outdir.has_abiext("DDB") for task in my_tasks]))
1386
1387        self.history.info("Will call mrgddb to merge %s DDB files:" % len(ddb_files))
1388        # DDB files are always produces so this should never happen!
1389        if not ddb_files:
1390            raise RuntimeError("Cannot find any DDB file to merge by the task of " % self)
1391
1392        # Final DDB file will be produced in the outdir of the work.
1393        out_ddb = self.outdir.path_in("out_DDB")
1394
1395        if len(ddb_files) == 1:
1396            # Avoid the merge. Just copy the DDB file to the outdir of the work.
1397            shutil.copy(ddb_files[0], out_ddb)
1398        else:
1399            # Call mrgddb
1400            desc = "DDB file merged by %s on %s" % (self.__class__.__name__, time.asctime())
1401            mrgddb = wrappers.Mrgddb(manager=self[0].manager, verbose=0)
1402            mrgddb.merge(self.outdir.path, ddb_files, out_ddb=out_ddb, description=desc,
1403                         delete_source_ddbs=delete_source_ddbs)
1404
1405        return out_ddb
1406
1407    def merge_pot1_files(self, delete_source=False):
1408        """
1409        This method is called when all the q-points have been computed.
1410        It runs `mrgdvdb` in sequential on the local machine to produce
1411        the final DVDB file in the outdir of the `Work`.
1412
1413        Args:
1414            delete_source: True if POT1 files should be removed after (successful) merge.
1415
1416        Returns: path to the output DVDB file. None if not DFPT POT file is found.
1417        """
1418        natom = len(self[0].input.structure)
1419        max_pertcase = 3 * natom
1420
1421        pot1_files = []
1422        for task in self:
1423            if not isinstance(task, DfptTask): continue
1424            paths = task.outdir.list_filepaths(wildcard="*_POT*")
1425            for path in paths:
1426                # Include only atomic perturbations i.e. files whose ext <= 3 * natom
1427                i = path.rindex("_POT")
1428                pertcase = int(path[i+4:].replace(".nc", ""))
1429                if pertcase <= max_pertcase:
1430                    pot1_files.append(path)
1431
1432        # prtpot = 0 disables the output of the DFPT POT files so an empty list is not fatal here.
1433        if not pot1_files: return None
1434
1435        self.history.info("Will call mrgdvdb to merge %s files:" % len(pot1_files))
1436
1437        # Final DDB file will be produced in the outdir of the work.
1438        out_dvdb = self.outdir.path_in("out_DVDB")
1439
1440        if len(pot1_files) == 1:
1441            # Avoid the merge. Just move the DDB file to the outdir of the work
1442            shutil.copy(pot1_files[0], out_dvdb)
1443        else:
1444            # FIXME: The merge may require a non-negligible amount of memory if lots of qpts.
1445            # Besides there are machines such as lemaitre3 that are problematic when
1446            # running MPI applications on the front-end
1447            mrgdvdb = wrappers.Mrgdvdb(manager=self[0].manager, verbose=0)
1448            mrgdvdb.merge(self.outdir.path, pot1_files, out_dvdb, delete_source=delete_source)
1449
1450        return out_dvdb
1451
1452
1453class PhononWork(Work, MergeDdb):
1454    """
1455    This work consists of nirred Phonon tasks where nirred is
1456    the number of irreducible atomic perturbations for a given set of q-points.
1457    It provides the callback method (on_all_ok) that calls mrgddb (mrgdv) to merge
1458    all the partial DDB (POT) files produced. The two files are available in the
1459    output directory of the Work.
1460
1461    .. rubric:: Inheritance Diagram
1462    .. inheritance-diagram:: PhononWork
1463    """
1464
1465    @classmethod
1466    def from_scf_task(cls, scf_task, qpoints, is_ngqpt=False, tolerance=None, with_becs=False,
1467                      ddk_tolerance=None, manager=None):
1468        """
1469        Construct a `PhononWork` from a |ScfTask| object.
1470        The input file for phonons is automatically generated from the input of the ScfTask.
1471        Each phonon task depends on the WFK file produced by the `scf_task`.
1472
1473        Args:
1474            scf_task: |ScfTask| object.
1475            qpoints: q-points in reduced coordinates. Accepts single q-point, list of q-points
1476                or three integers defining the q-mesh if `is_ngqpt`.
1477            is_ngqpt: True if `qpoints` should be interpreted as divisions instead of q-points.
1478            tolerance: dict {"varname": value} with the tolerance to be used in the phonon run.
1479                None to use AbiPy default.
1480            with_becs: Activate calculation of Electric field and Born effective charges.
1481            ddk_tolerance: dict {"varname": value} with the tolerance used in the DDK run if with_becs.
1482                None to use AbiPy default.
1483            manager: |TaskManager| object.
1484        """
1485        if not isinstance(scf_task, ScfTask):
1486            raise TypeError("task `%s` does not inherit from ScfTask" % scf_task)
1487
1488        if is_ngqpt:
1489            qpoints = scf_task.input.abiget_ibz(ngkpt=qpoints, shiftk=[0, 0, 0], kptopt=1).points
1490        qpoints = np.reshape(qpoints, (-1, 3))
1491
1492        new = cls(manager=manager)
1493        if with_becs:
1494            new.add_becs_from_scf_task(scf_task, ddk_tolerance, ph_tolerance=tolerance)
1495
1496        for qpt in qpoints:
1497            if with_becs and np.sum(qpt ** 2) < 1e-12: continue
1498            multi = scf_task.input.make_ph_inputs_qpoint(qpt, tolerance=tolerance)
1499            for ph_inp in multi:
1500                new.register_phonon_task(ph_inp, deps={scf_task: "WFK"})
1501
1502        return new
1503
1504    @classmethod
1505    def from_scf_input(cls, scf_input, qpoints, is_ngqpt=False, tolerance=None,
1506                       with_becs=False, ddk_tolerance=None, manager=None):
1507        """
1508        Similar to `from_scf_task`, the difference is that this method requires
1509        an input for SCF calculation. A new |ScfTask| is created and added to the Work.
1510        This API should be used if the DDB of the GS task should be merged.
1511        """
1512        if is_ngqpt:
1513            qpoints = scf_input.abiget_ibz(ngkpt=qpoints, shiftk=[0, 0, 0], kptopt=1).points
1514
1515        qpoints = np.reshape(qpoints, (-1, 3))
1516
1517        new = cls(manager=manager)
1518        # Create ScfTask
1519        scf_task = new.register_scf_task(scf_input)
1520
1521        if with_becs:
1522            new.add_becs_from_scf_task(scf_task, ddk_tolerance, ph_tolerance=tolerance)
1523
1524        for qpt in qpoints:
1525            if with_becs and np.sum(qpt ** 2) < 1e-12: continue
1526            multi = scf_task.input.make_ph_inputs_qpoint(qpt, tolerance=tolerance)
1527            for ph_inp in multi:
1528                new.register_phonon_task(ph_inp, deps={scf_task: "WFK"})
1529
1530        return new
1531
1532    #@check_spectator
1533    def on_all_ok(self):
1534        """
1535        This method is called when all the q-points have been computed.
1536        Ir runs `mrgddb` in sequential on the local machine to produce
1537        the final DDB file in the outdir of the |Work|.
1538        """
1539        # Merge DDB files.
1540        out_ddb = self.merge_ddb_files()
1541        # Merge DVDB files.
1542        out_dvdb = self.merge_pot1_files()
1543
1544        return self.Results(node=self, returncode=0, message="DDB merge done")
1545
1546
1547class PhononWfkqWork(Work, MergeDdb):
1548    """
1549    This work computes phonons with DFPT on an arbitrary q-mesh (usually denser than the k-mesh for electrons)
1550    by computing WKQ files for each q-point.
1551    The number of irreducible atomic perturbations for each q-point is taken into account.
1552    It provides the callback method (on_all_ok) that calls mrgddb (mrgdv) to merge
1553    all the partial DDB (POT) files produced. The two files are available in the
1554    output directory of the Work. The WKQ files are removed at runtime.
1555
1556    .. rubric:: Inheritance Diagram
1557    .. inheritance-diagram:: PhononWfkqWork
1558    """
1559
1560    @classmethod
1561    def from_scf_task(cls, scf_task, ngqpt, ph_tolerance=None, tolwfr=1.0e-22, nband=None,
1562                      with_becs=False, ddk_tolerance=None, shiftq=(0, 0, 0), is_ngqpt=True, remove_wfkq=True,
1563                      prepgkk=0, manager=None):
1564        """
1565        Construct a `PhononWfkqWork` from a |ScfTask| object.
1566        The input files for WFQ and phonons are automatically generated from the input of the ScfTask.
1567        Each phonon task depends on the WFK file produced by scf_task and the associated WFQ file.
1568
1569        Args:
1570            scf_task: |ScfTask| object.
1571            ngqpt: three integers defining the q-mesh
1572            with_becs: Activate calculation of Electric field and Born effective charges.
1573            ph_tolerance: dict {"varname": value} with the tolerance for the phonon run.
1574                None to use AbiPy default.
1575            tolwfr: tolerance used to compute WFQ.
1576            ddk_tolerance: dict {"varname": value} with the tolerance used in the DDK run if with_becs.
1577                None to use AbiPy default.
1578            shiftq: Q-mesh shift. Multiple shifts are not supported.
1579            is_ngqpt: the ngqpt is interpreted as a set of integers defining the q-mesh, otherwise
1580                      is an explicit list of q-points
1581            remove_wfkq: Remove WKQ files when the children are completed.
1582            prepgkk: 1 to activate computation of all 3*natom perts (debugging option).
1583            manager: |TaskManager| object.
1584
1585        .. note:
1586
1587            Use k-meshes with one shift and q-meshes that are multiple of ngkpt
1588            to decrease the number of WFQ files to be computed.
1589        """
1590        if not isinstance(scf_task, ScfTask):
1591            raise TypeError("task `%s` does not inherit from ScfTask" % scf_task)
1592
1593        shiftq = np.reshape(shiftq, (3, ))
1594        #print("ngqpt", ngqpt, "\nshiftq", shiftq)
1595        if is_ngqpt:
1596            qpoints = scf_task.input.abiget_ibz(ngkpt=ngqpt, shiftk=shiftq, kptopt=1).points
1597        else:
1598            qpoints = np.reshape(ngqpt, (-1, 3))
1599
1600        new = cls(manager=manager)
1601        new.remove_wfkq = remove_wfkq
1602        new.phonon_tasks = []
1603        new.wfkq_tasks = []
1604        new.wfkq_task_children = collections.defaultdict(list)
1605
1606        if with_becs:
1607            # Add DDK and BECS.
1608            new.add_becs_from_scf_task(scf_task, ddk_tolerance, ph_tolerance)
1609
1610        # Get ngkpt, shift for electrons from input.
1611        # Won't try to skip WFQ if multiple shifts or off-diagonal kptrlatt
1612        ngkpt, shiftk = scf_task.input.get_ngkpt_shiftk()
1613        #try_to_skip_wfkq = True
1614        #if ngkpt is not None and len(shiftk) == 1 and not is_ngqpt:
1615        #    try_to_skip_wfkq = True
1616        try_to_skip_wfkq = False
1617
1618        # TODO: One could avoid kptopt 3 by computing WFK in the IBZ and then rotating.
1619        # but this has to be done inside Abinit.
1620        for qpt in qpoints:
1621            is_gamma = np.sum(qpt ** 2) < 1e-12
1622            if with_becs and is_gamma: continue
1623
1624            # Avoid WFQ if k + q = k (requires ngkpt, multiple shifts are not supported)
1625            need_wfkq = True
1626            if is_gamma:
1627                need_wfkq = False
1628            elif try_to_skip_wfkq:
1629                # k = (i + shiftk) / ngkpt
1630                qinds = np.rint(qpt * ngqpt - shiftq)
1631                #print("qpt", qpt, "\nqinds", qinds, "\nngkpt", ngkpt, "\nngqpt", ngqpt)
1632                f = (qinds * ngkpt) % ngqpt
1633                need_wfkq = np.any(f != 0)
1634
1635            #neee_wfkq = True
1636
1637            if need_wfkq:
1638                nscf_inp = scf_task.input.new_with_vars(qpt=qpt, nqpt=1, iscf=-2, kptopt=3, tolwfr=tolwfr)
1639                if nband:
1640                    nbdbuf = max(2, nband*0.1)
1641                    nscf_inp.set_vars(nband=nband+nbdbuf, nbdbuf=nbdbuf)
1642                wfkq_task = new.register_nscf_task(nscf_inp, deps={scf_task: ["DEN", "WFK"]})
1643                new.wfkq_tasks.append(wfkq_task)
1644
1645            multi = scf_task.input.make_ph_inputs_qpoint(qpt, tolerance=ph_tolerance, prepgkk=prepgkk)
1646            for ph_inp in multi:
1647                deps = {scf_task: "WFK", wfkq_task: "WFQ"} if need_wfkq else {scf_task: "WFK"}
1648                #ph_inp["prtwf"] = -1
1649                t = new.register_phonon_task(ph_inp, deps=deps)
1650                new.phonon_tasks.append(t)
1651                if need_wfkq:
1652                    new.wfkq_task_children[wfkq_task].append(t)
1653
1654        return new
1655
1656    def on_ok(self, sender):
1657        """
1658        This callback is called when one task reaches status `S_OK`.
1659        It removes the WFKQ file if all its children have reached `S_OK`.
1660        """
1661        if self.remove_wfkq:
1662            for task in self.wfkq_tasks:
1663                if task.status != task.S_OK: continue
1664                children = self.wfkq_task_children[task]
1665                if all(child.status == child.S_OK for child in children):
1666                    path = task.outdir.has_abiext("WFQ")
1667                    if path:
1668                        self.history.info("Removing WFQ: %s" % path)
1669                        os.remove(path)
1670
1671        return super().on_ok(sender)
1672
1673    #@check_spectator
1674    def on_all_ok(self):
1675        """
1676        This method is called when all the q-points have been computed.
1677        Ir runs `mrgddb` in sequential on the local machine to produce
1678        the final DDB file in the outdir of the |Work|.
1679        """
1680        # Merge DDB files.
1681        out_ddb = self.merge_ddb_files()
1682
1683        # Merge DVDB files.
1684        out_dvdb = self.merge_pot1_files()
1685
1686        return self.Results(node=self, returncode=0, message="DDB merge done")
1687
1688
1689class GKKPWork(Work):
1690    """
1691    This work computes electron-phonon matrix elements for all the q-points
1692    present in a DVDB and DDB file
1693
1694    .. rubric:: Inheritance Diagram
1695    .. inheritance-diagram:: GKKPWork
1696    """
1697    @classmethod
1698    def from_den_ddb_dvdb(cls, inp, den_path, ddb_path, dvdb_path, mpiprocs=1, remove_wfkq=True,
1699                          qpath=None, with_ddk=True, expand=True, manager=None):
1700        """
1701        Construct a `PhononWfkqWork` from a DDB and DVDB file.
1702        For each q found, a WFQ task and an EPH task computing the matrix elements are created.
1703        """
1704        import abipy.abilab as abilab
1705
1706        # Create file nodes
1707        den_file = FileNode(den_path)
1708        ddb_file = FileNode(ddb_path)
1709        dvdb_file = FileNode(dvdb_path)
1710
1711        # Create new work
1712        new = cls(manager=manager)
1713        new.remove_wfkq = remove_wfkq
1714        new.wfkq_tasks = []
1715        new.wfkq_task_children = collections.defaultdict(list)
1716        if manager is None: manager = TaskManager.from_user_config()
1717        tm = manager.new_with_fixed_mpi_omp(mpiprocs, 1)
1718
1719        # Create a WFK task
1720        kptopt = 1 if expand else 3
1721        nscf_inp = inp.new_with_vars(iscf=-2, kptopt=kptopt)
1722        wfk_task = new.register_nscf_task(nscf_inp, deps={den_file: "DEN"},manager=tm)
1723        new.wfkq_tasks.append(wfk_task)
1724        new.wfk_task = wfk_task
1725
1726        # Read path and regular grid from DDB file
1727        with abilab.abiopen(ddb_path) as ddb:
1728            q_frac_coords = np.array([k.frac_coords for k in ddb.qpoints])
1729            ddb_ngqpt = ddb.guessed_ngqpt
1730
1731        # If qpath is set, we read the list of q-points to be used to interpolate the DVDB file.
1732        # The DVDB and DDB file have to correspond to a regular grid.
1733        dvdb = dvdb_file
1734        if qpath is None:
1735            qpath = q_frac_coords
1736        else:
1737            interp_inp = inp.new_with_vars(optdriver=7, eph_task=-5, ddb_ngqpt=ddb_ngqpt,
1738                                           ph_nqpath=len(qpath), ph_qpath=qpath, prtphdos=0)
1739            dvdb = new.register_eph_task(interp_inp, deps={wfk_task: "WFK", ddb_file: "DDB", dvdb_file: "DVDB"},
1740                                         manager=tm)
1741
1742        # Create a WFK expansion task
1743        if expand:
1744            fbz_nscf_inp = inp.new_with_vars(optdriver=8)
1745            fbz_nscf_inp.set_spell_check(False)
1746            fbz_nscf_inp.set_vars(wfk_task="wfk_fullbz")
1747            tm_serial = manager.new_with_fixed_mpi_omp(1,1)
1748            wfk_task = new.register_nscf_task(fbz_nscf_inp, deps={wfk_task: "WFK", den_file: "DEN"},
1749                                              manager=tm_serial)
1750            new.wfkq_tasks.append(wfk_task)
1751            new.wfk_task = wfk_task
1752
1753        if with_ddk:
1754            kptopt = 3 if expand else 1
1755            ddk_inp = inp.new_with_vars(optdriver=8,kptopt=kptopt)
1756            ddk_inp.set_spell_check(False)
1757            ddk_inp.set_vars(wfk_task="wfk_ddk")
1758            ddk_task = new.register_nscf_task(ddk_inp, deps={wfk_task: "WFK", den_file: "DEN"}, manager=tm)
1759            new.wfkq_tasks.append(ddk_task)
1760
1761        # For each qpoint
1762        for qpt in qpath:
1763            is_gamma = np.sum(qpt ** 2) < 1e-12
1764            if is_gamma:
1765                # Create a link from WFK to WFQ on_ok
1766                wfkq_task = wfk_task
1767                deps = {wfk_task: ["WFK", "WFQ"], ddb_file: "DDB", dvdb: "DVDB"}
1768            else:
1769                # Create a WFQ task
1770                nscf_inp = nscf_inp.new_with_vars(kptopt=3, qpt=qpt, nqpt=1)
1771                wfkq_task = new.register_nscf_task(nscf_inp, deps={den_file: "DEN"}, manager=tm)
1772                new.wfkq_tasks.append(wfkq_task)
1773                deps = {wfk_task: "WFK", wfkq_task: "WFQ", ddb_file: "DDB", dvdb: "DVDB"}
1774
1775            # Create a EPH task
1776            eph_inp = inp.new_with_vars(optdriver=7, prtphdos=0, eph_task=-2, kptopt=3,
1777                                        ddb_ngqpt=[1, 1, 1], nqpt=1, qpt=qpt)
1778            t = new.register_eph_task(eph_inp, deps=deps, manager=tm)
1779            new.wfkq_task_children[wfkq_task].append(t)
1780
1781        return new
1782
1783    @classmethod
1784    def from_phononwfkq_work(cls, phononwfkq_work, nscf_vars={}, remove_wfkq=True, with_ddk=True, manager=None):
1785        """
1786        Construct a `GKKPWork` from a `PhononWfkqWork` object.
1787        The WFQ are the ones used for PhononWfkqWork so in principle have only valence bands
1788        """
1789        # Get list of qpoints from the the phonon tasks in this work
1790        qpoints = []
1791        qpoints_deps = []
1792        for task in phononwfkq_work:
1793            if isinstance(task,PhononTask):
1794                # Store qpoints
1795                qpt = task.input.get("qpt", [0, 0, 0])
1796                qpoints.append(qpt)
1797                # Store dependencies
1798                qpoints_deps.append(task.deps)
1799
1800        # Create file nodes
1801        ddb_path = phononwfkq_work.outdir.has_abiext("DDB")
1802        dvdb_path = phononwfkq_work.outdir.has_abiext("DVDB")
1803        ddb_file = FileNode(ddb_path)
1804        dvdb_file = FileNode(dvdb_path)
1805
1806        # Get scf_task from first q-point
1807        for dep in qpoints_deps[0]:
1808            if isinstance(dep.node,ScfTask) and dep.exts[0] == 'WFK':
1809                scf_task = dep.node
1810
1811        # Create new work
1812        new = cls(manager=manager)
1813        new.remove_wfkq = remove_wfkq
1814        new.wfkq_tasks = []
1815        new.wfk_task = []
1816
1817        # Add one eph task per qpoint
1818        for qpt,qpoint_deps in zip(qpoints,qpoints_deps):
1819            # Create eph task
1820            eph_input = scf_task.input.new_with_vars(optdriver=7, prtphdos=0, eph_task=-2,
1821                                                     ddb_ngqpt=[1, 1, 1], nqpt=1, qpt=qpt)
1822            deps = {ddb_file: "DDB", dvdb_file: "DVDB"}
1823            for dep in qpoint_deps:
1824                deps[dep.node] = dep.exts[0]
1825            # If no WFQ in deps link the WFK with WFQ extension
1826            if 'WFQ' not in deps.values():
1827                inv_deps = dict((v, k) for k, v in deps.items())
1828                wfk_task = inv_deps['WFK']
1829                wfk_path = wfk_task.outdir.has_abiext("WFK")
1830                # Check if netcdf
1831                filename, extension = os.path.splitext(wfk_path)
1832                infile = 'out_WFQ' + extension
1833                wfq_path = os.path.join(os.path.dirname(wfk_path), infile)
1834                if not os.path.isfile(wfq_path): os.symlink(wfk_path, wfq_path)
1835                deps[FileNode(wfq_path)] = 'WFQ'
1836            new.register_eph_task(eph_input, deps=deps)
1837
1838        return new
1839
1840    def on_ok(self, sender):
1841        """
1842        This callback is called when one task reaches status `S_OK`.
1843        It removes the WFKQ file if all its children have reached `S_OK`.
1844        """
1845        if self.remove_wfkq:
1846            for task in self.wfkq_tasks:
1847                if task.status != task.S_OK: continue
1848                children = self.wfkq_task_children[task]
1849                if all(child.status == child.S_OK for child in children):
1850                    path = task.outdir.has_abiext("WFQ")
1851                    if path:
1852                        self.history.info("Removing WFQ: %s" % path)
1853                        os.remove(path)
1854
1855        # If wfk task we create a link to a wfq file so abinit is happy
1856        if sender == self.wfk_task:
1857            wfk_path = self.wfk_task.outdir.has_abiext("WFK")
1858            # Check if netcdf
1859            filename, extension = os.path.splitext(wfk_path)
1860            infile = 'out_WFQ' + extension
1861            infile = os.path.join(os.path.dirname(wfk_path), infile)
1862            os.symlink(wfk_path, infile)
1863
1864        return super().on_ok(sender)
1865
1866
1867class BecWork(Work, MergeDdb):
1868    """
1869    Work for the computation of the Born effective charges.
1870
1871    This work consists of DDK tasks and phonon + electric field perturbation
1872    It provides the callback method (on_all_ok) that calls mrgddb to merge the
1873    partial DDB files produced by the work.
1874
1875    .. rubric:: Inheritance Diagram
1876    .. inheritance-diagram:: BecWork
1877    """
1878
1879    @classmethod
1880    def from_scf_task(cls, scf_task, ddk_tolerance=None, ph_tolerance=None, manager=None):
1881        """
1882        Build tasks for the computation of Born effective charges from a ground-state task.
1883
1884        Args:
1885            scf_task: |ScfTask| object.
1886            ddk_tolerance: tolerance used in the DDK run if with_becs. None to use AbiPy default.
1887            ph_tolerance: dict {"varname": value} with the tolerance used in the phonon run.
1888                None to use AbiPy default.
1889            manager: |TaskManager| object.
1890        """
1891        new = cls(manager=manager)
1892        new.add_becs_from_scf_task(scf_task, ddk_tolerance, ph_tolerance)
1893        return new
1894
1895    def on_all_ok(self):
1896        """
1897        This method is called when all tasks reach S_OK
1898        Ir runs `mrgddb` in sequential on the local machine to produce
1899        the final DDB file in the outdir of the |Work|.
1900        """
1901        # Merge DDB files.
1902        out_ddb = self.merge_ddb_files()
1903        return self.Results(node=self, returncode=0, message="DDB merge done")
1904
1905
1906class DteWork(Work, MergeDdb):
1907    """
1908    Work for the computation of the third derivative of the energy.
1909
1910    This work consists of DDK tasks and electric field perturbation.
1911    It provides the callback method (on_all_ok) that calls mrgddb to merge the partial DDB files produced
1912
1913    .. rubric:: Inheritance Diagram
1914    .. inheritance-diagram:: DteWork
1915    """
1916    @classmethod
1917    def from_scf_task(cls, scf_task, ddk_tolerance=None, manager=None):
1918        """
1919        Build a DteWork from a ground-state task.
1920
1921        Args:
1922            scf_task: |ScfTask| object.
1923            ddk_tolerance: tolerance used in the DDK run if with_becs. None to use AbiPy default.
1924            manager: |TaskManager| object.
1925        """
1926        if not isinstance(scf_task, ScfTask):
1927            raise TypeError("task `%s` does not inherit from ScfTask" % scf_task)
1928
1929        new = cls(manager=manager)
1930
1931        # DDK calculations
1932        multi_ddk = scf_task.input.make_ddk_inputs(tolerance=ddk_tolerance)
1933
1934        ddk_tasks = []
1935        for ddk_inp in multi_ddk:
1936            ddk_task = new.register_ddk_task(ddk_inp, deps={scf_task: "WFK"})
1937            ddk_tasks.append(ddk_task)
1938
1939        # Build the list of inputs for electric field perturbation
1940        # Each task is connected to all the previous DDK, DDE task and to the scf_task.
1941        multi_dde = scf_task.input.make_dde_inputs(use_symmetries=False)
1942
1943        # To compute the nonlinear coefficients all the directions of the perturbation
1944        # have to be taken in consideration
1945        # DDE calculations
1946        dde_tasks = []
1947        dde_deps = {ddk_task: "DDK" for ddk_task in ddk_tasks}
1948        dde_deps.update({scf_task: "WFK"})
1949        for dde_inp in multi_dde:
1950            dde_task = new.register_dde_task(dde_inp, deps=dde_deps)
1951            dde_tasks.append(dde_task)
1952
1953        # DTE calculations
1954        dte_deps = {scf_task: "WFK DEN"}
1955        dte_deps.update({dde_task: "1WF 1DEN" for dde_task in dde_tasks})
1956
1957        multi_dte = scf_task.input.make_dte_inputs()
1958        dte_tasks = []
1959        for dte_inp in multi_dte:
1960            dte_task = new.register_dte_task(dte_inp, deps=dte_deps)
1961            dte_tasks.append(dte_task)
1962
1963        return new
1964
1965    def on_all_ok(self):
1966        """
1967        This method is called when all tasks reach S_OK
1968        It runs `mrgddb` in sequential on the local machine to produce
1969        the final DDB file in the outdir of the `Work`.
1970        """
1971        # Merge DDB files.
1972        out_ddb = self.merge_ddb_files()
1973        return self.Results(node=self, returncode=0, message="DDB merge done")
1974
1975
1976class ConducWork(Work):
1977    """
1978    Workflow for the computation of electrical conductivity.
1979
1980    Can be called from :
1981        1. MultiDataset and PhononWork
1982        2. MultiDataset, DDB filepath and DVDB filepath.
1983    Can use Kerange Capability using withKerange=True
1984
1985    This work consists of 3 tasks or 5 tasks with kerange :
1986        1. SCF GS
1987        2. NSCF
1988        3. Kerange (Kerange only)
1989        4. WFK Interpolation (Kerange only)
1990        5. Electrical Conductivity Calculation.
1991    """
1992
1993    @classmethod
1994    def from_phwork(cls, phwork, multi, nbr_proc=None, flow=None, with_kerange=False,
1995                    omp_nbr_thread=1, manager=None):
1996        """
1997        Construct a |ConducWork| from a |PhononWork| and |MultiDataset|.
1998
1999        Args:
2000            phwork: a |PhononWork| object calculating the DDB and DVDB files.
2001            multi: a |MultiDataset| object containing a list of 3 datasets or 5 with Kerange.
2002                       See abipy/abio/factories.py -> conduc_from_scf_nscf_inputs for details about multi.
2003            nbr_proc: Required if with_kerange since autoparal doesn't work with optdriver=8.
2004            flow: The flow calling the work. Used for  with_fixed_mpi_omp.
2005            with_kerange: True if using Kerange.
2006            omp_nbr_thread : Number of omp_thread to use.
2007            manager: |TaskManager| of the task. If None, the manager is initialized from the config file.
2008        """
2009        # Verify phwork
2010        if not isinstance(phwork, PhononWork):
2011            raise TypeError("Work `%s` does not inherit from PhononWork" % phwork)
2012        # Verify Multi
2013        if (not with_kerange) and (multi.ndtset != 3): #Without kerange, multi should contain 3 datasets
2014            raise ValueError("""The |MultiDataset| object does not contain the expected number of dataset.
2015                                It should have 3 datasets and it had `%s`. You should generate
2016                                multi with the factory function conduc_from_scf_nscf_inputs""" % multi.ndtset)
2017        if (with_kerange) and (multi.ndtset != 5): # With Kerange, multi should contain 5 datasets
2018            raise ValueError("""The |MultiDataset| object does not contain the expected number of dataset.
2019                              It should have 5 datasets and it had `%s`. You should generate
2020                              multi with the factory function conduc_from_scf_nscf_inputs""" % multi.ndtset)
2021        # Verify nbr_proc and flow are defined if with_kerange
2022        if with_kerange and (flow is None or nbr_proc is None):
2023            raise ValueError("""When using kerange, the argument flow and nbr_proc must be passed to the function from_filepath
2024                                flow = {}\n nbr_proc = {}""".format(flow, nbr_proc))
2025
2026        new = cls(manager=manager)
2027
2028        new.register_task(multi[0])
2029        new.register_task(multi[1], deps={new[0]: "DEN"})
2030        taskNumber = 2 # To keep track of the task in new and multi
2031
2032        if(with_kerange): # Using Kerange
2033            new.register_task(multi[2], deps={new[1]: "WFK"})
2034            new.register_task(multi[3], deps={new[0]: "DEN", new[1]: "WFK", new[2]: "KERANGE.nc"})
2035            taskNumber = 4 # We have 2 more dataset
2036
2037        new.register_task(multi[taskNumber], deps={new[taskNumber-1]: "WFK", phwork: ["DDB","DVDB"]})
2038
2039        for task in new:
2040            task.set_work(new)
2041
2042        # Manual Paralellization since autoparal doesn't work with optdriver=8 (t2)
2043        if flow is not None :
2044            new.set_flow(flow)
2045            if nbr_proc is not None:
2046                for task in new[2:]:
2047                    task.with_fixed_mpi_omp(nbr_proc, omp_nbr_thread)
2048        return new
2049
2050    @classmethod
2051    def from_filepath(cls, ddb_path, dvdb_path, multi, nbr_proc=None, flow=None,
2052                      with_kerange=False, omp_nbr_thread=1, manager=None):
2053        """
2054        Construct a |ConducWork| from previously calculated DDB/DVDB file and |MultiDataset|.
2055
2056        Args:
2057            multi: a |MultiDataset| object containing a list of 3 datasets or 5 with Kerange.
2058                       See abipy/abio/factories.py -> conduc_from_scf_nscf_inputs for details about multi.
2059            ddb_path: a string containing the path to the DDB file.
2060            dvdb_path: a string containing the path to the DVDB file.
2061            nbr_proc: Required if with_kerange since autoparal doesn't work with optdriver=8.
2062            flow: The flow calling the work, only needed if with_kerange.
2063            with_kerange: True if using Kerange.
2064            omp_nbr_thread : Number of omp_thread to use.
2065            manager: |TaskManager| of the task. If None, the manager is initialized from the config file.
2066        """
2067        # Verify Multi
2068        if (not with_kerange) and (multi.ndtset != 3): #Without kerange, multi should contain 4 datasets
2069            raise ValueError("""The |MultiDataset| object does not contain the expected number of dataset.
2070                                It should have 4 datasets and it had `%s`. You should generate
2071                                multi with the factory function conduc_from_scf_nscf_inputs""" % multi.ndtset)
2072        if (with_kerange) and (multi.ndtset != 5): # With Kerange, multi should contain 6 datasets
2073            raise ValueError("""The |MultiDataset| object does not contain the expected number of dataset.
2074                              It should have 6 datasets and it had `%s`.You should generate
2075                                multi with the factory function conduc_from_scf_nscf_inputs""" % multi.ndtset)
2076        # Make sure both file exists
2077        if not os.path.exists(ddb_path):
2078            raise ValueError("The DDB file doesn't exists : `%s`" % ddb_path)
2079        if not os.path.exists(dvdb_path):
2080            raise ValueError("The DVDB file doesn't exists : `%s`" % dvdb_path)
2081        # Verify nbr_proc and flow are defined if with_kerange
2082        if with_kerange and (flow is None or nbr_proc is None):
2083            raise ValueError("""When using kerange, the argument flow and nbr_proc must be passed to the function from_filepath
2084                                flow = {}, nbr_proc = {}""".format(flow, nbr_proc))
2085
2086        new = cls(manager=manager)
2087
2088        new.register_task(multi[0])
2089        new.register_task(multi[1], deps={new[0]: "DEN"})
2090        taskNumber = 2 # To keep track of the task in new and multi
2091
2092        if(with_kerange): # Using Kerange
2093            new.register_task(multi[2], deps={new[1]: "WFK"})
2094            new.register_task(multi[3], deps={new[0]: "DEN", new[1]: "WFK", new[2]: "KERANGE.nc"})
2095            taskNumber = 4 # We have 2 more task
2096
2097        #new.register_task(multi[taskNumber], deps=[Dependency(new[taskNumber-1], "WFK"),
2098        #                                             Dependency(ddb_path, "DDB"),
2099        #                                             Dependency(dvdb_path, "DVDB")])
2100        #new.register_task(multi[taskNumber], deps=[{new[taskNumber-1]: "WFK"},(ddb_path, "DDB"),(dvdb_path, "DVDB")])
2101        new.register_task(multi[taskNumber], deps={new[taskNumber-1]: "WFK", ddb_path: "DDB", dvdb_path: "DVDB"})
2102
2103        for task in new:
2104            task.set_work(new)
2105
2106        # Manual Paralellization since autoparal doesn't work with optdriver=8 (t2)
2107        if flow is not None :
2108            new.set_flow(flow)
2109            if nbr_proc is not None:
2110                for task in new[2:]:
2111                    task.with_fixed_mpi_omp(nbr_proc, omp_nbr_thread)
2112        return new
2113