1# coding: utf-8
2"""
3This module defines the Node class that is inherited by Task, Work and Flow objects.
4"""
5
6import sys
7import os
8import time
9import collections
10import abc
11import numpy as np
12
13from collections import OrderedDict
14from pprint import pprint
15from pymatgen.util.io_utils import AtomicFile
16from pydispatch import dispatcher
17from monty.termcolor import colored
18from monty.serialization import loadfn
19from monty.string import is_string
20from monty.io import FileLock
21from monty.collections import AttrDict, Namespace
22from monty.functools import lazy_property
23from monty.json import MSONable
24from pymatgen.util.serialization import json_pretty_dump, pmg_serialize
25from .utils import File, Directory, Dirviz, irdvars_for_ext, abi_extensions
26
27
28import logging
29logger = logging.getLogger(__name__)
30
31
32def _2attrs(item):
33    return item if item is None or isinstance(list, tuple) else (item,)
34
35
36class Status(int):
37    """This object is an integer representing the status of the `Node`."""
38
39    # Possible status of the node. See monty.termocolor for the meaning of color, on_color and attrs.
40    _STATUS_INFO = [
41        # (value, name, color, on_color, attrs)
42        # Node has been initialized
43        (1, "Initialized", None, None, None),
44        # Task is locked an must be explicitly unlocked by an external subject (Work).
45        (2, "Locked", "grey", None, None),
46        # Node is ready i.e. all the depencies of the node have status S_OK
47        (3, "Ready", None, None, None),
48        # Node has been submitted (The `Task` is running or we have started to finalize the Work)
49        (4, "Submitted", "blue", None, None),
50        # Node is running.
51        (5, "Running", "magenta", None, None),
52        # Node done, This does not imply that results are ok or that the calculation completed successfully
53        (6, "Done", None, None, None),
54        # raised an Error by ABINIT.
55        (7, "AbiCritical", "red", None, None),
56        # Node raised an Error by submitting submission script, or by executing it
57        (8, "QCritical", "red", "on_white", None),
58        # This usually means that an iterative algorithm didn't converge.
59        (9, "Unconverged", "red", "on_yellow", None),
60        # Node raised an unrecoverable error, usually raised when an attempt to fix one of other types failed.
61        (10, "Error", "red", None, None),
62        # Execution completed successfully.
63        (11, "Completed", "green", None, None),
64    ]
65    _STATUS2STR = OrderedDict([(t[0], t[1]) for t in _STATUS_INFO])
66    _STATUS2COLOR_OPTS = OrderedDict([(t[0], {"color": t[2], "on_color": t[3], "attrs": _2attrs(t[4])}) for t in _STATUS_INFO])
67
68    def __repr__(self):
69        return "<%s: %s, at %s>" % (self.__class__.__name__, str(self), id(self))
70
71    def __str__(self):
72        """String representation."""
73        return self._STATUS2STR[self]
74
75    @classmethod
76    def as_status(cls, obj):
77        """Convert obj into Status."""
78        if obj is None: return None
79        return obj if isinstance(obj, cls) else cls.from_string(obj)
80
81    @classmethod
82    def from_string(cls, s):
83        """Return a `Status` instance from its string representation."""
84        for num, text in cls._STATUS2STR.items():
85            if text == s:
86                return cls(num)
87        else:
88            raise ValueError("Wrong string %s" % s)
89
90    @classmethod
91    def all_status_strings(cls):
92        """List of strings with all possible values status."""
93        return [info[1] for info in cls._STATUS_INFO]
94
95    @property
96    def is_critical(self):
97        """True if status is critical."""
98        return str(self) in ("AbiCritical", "QCritical", "Unconverged", "Error")
99
100    @property
101    def color_opts(self):
102        return self._STATUS2COLOR_OPTS[self]
103
104    @property
105    def colored(self):
106        """Return colorized text used to print the status if the stream supports it."""
107        return colored(str(self), **self.color_opts)
108
109
110class Dependency(object):
111    """
112    This object describes the dependencies among the nodes of a calculation.
113
114    A `Dependency` consists of a `Node` that produces a list of products (files)
115    that are used by the other nodes (`Task` or `Work`) to start the calculation.
116    One usually creates the object by calling work.register
117
118    Example:
119
120        # Register the SCF task in work.
121        scf_task = work.register(scf_strategy)
122
123        # Register the NSCF calculation and its dependency on the SCF run via deps.
124        nscf_task = work.register(nscf_strategy, deps={scf_task: "DEN"})
125    """
126    def __init__(self, node, exts=None):
127        """
128        Args:
129            node: The task or the worfklow associated to the dependency or string with a filepath.
130            exts: Extensions of the output files that are needed for running the other tasks.
131        """
132        self._node = Node.as_node(node)
133
134        if exts and is_string(exts): exts = exts.split()
135
136        # Extract extensions.
137        self.exts = [e for e in exts if not e.startswith("@")]
138
139        # Save getters
140        self.getters = [e for e in exts if e.startswith("@")]
141        #if self.getters: print(self.getters)
142
143    def __hash__(self):
144        return hash(self._node)
145
146    def __repr__(self):
147        return "node %s will produce: %s " % (repr(self.node), repr(self.exts))
148
149    def __str__(self):
150        return "node %s will produce: %s " % (str(self.node), str(self.exts))
151
152    @property
153    def info(self):
154        return str(self.node)
155
156    @property
157    def node(self):
158        """The |Node| associated to the dependency."""
159        return self._node
160
161    @property
162    def status(self):
163        """The status of the dependency, i.e. the status of the |Node|."""
164        return self.node.status
165
166    @lazy_property
167    def products(self):
168        """List of output files produces by self."""
169        _products = []
170        for ext in self.exts:
171            prod = Product(ext, self.node.opath_from_ext(ext))
172            _products.append(prod)
173
174        return _products
175
176    def apply_getters(self, task):
177        """
178        This function is called when we specify the task dependencies with the syntax:
179
180            deps={node: "@property"}
181
182        In this case the task has to the get `property` from `node` before starting the calculation.
183
184        At present, the following properties are supported:
185
186            - @structure
187        """
188        if not self.getters: return
189
190        for getter in self.getters:
191            if getter == "@structure":
192                task.history.info("Getting structure from %s" % self.node)
193                new_structure = self.node.get_final_structure()
194                task._change_structure(new_structure)
195            else:
196                raise ValueError("Wrong getter %s" % getter)
197
198    def connecting_vars(self):
199        """
200        Returns a dictionary with the variables that must be added to the
201        input file in order to connect this |Node| to its dependencies.
202        """
203        vars = {}
204        for prod in self.products:
205            vars.update(prod.connecting_vars())
206
207        return vars
208
209    def get_filepaths_and_exts(self):
210        """Returns the paths of the output files produced by self and its extensions"""
211        filepaths = [prod.filepath for prod in self.products]
212        exts = [prod.ext for prod in self.products]
213
214        return filepaths, exts
215
216
217class Product(object):
218    """
219    A product represents an output file produced by ABINIT instance.
220    This file is needed to start another `Task` or another `Work`.
221    """
222    def __init__(self, ext, path):
223        """
224        Args:
225            ext: ABINIT file extension
226            path: (asbolute) filepath
227        """
228        if ext not in abi_extensions():
229            raise ValueError("Extension `%s` has not been registered in the internal database" % str(ext))
230
231        self.ext = ext
232        self.file = File(path)
233
234    @classmethod
235    def from_file(cls, filepath):
236        """Build a :class:`Product` instance from a filepath."""
237        # Find the abinit extension.
238        for i in range(len(filepath)):
239            if filepath[i:] in abi_extensions():
240                ext = filepath[i:]
241                break
242        else:
243            raise ValueError("Cannot detect abinit extension in %s" % filepath)
244
245        return cls(ext, filepath)
246
247    def __str__(self):
248        return "File=%s, Extension=%s, " % (self.file.path, self.ext)
249
250    @property
251    def filepath(self):
252        """Absolute path of the file."""
253        return self.file.path
254
255    def connecting_vars(self):
256        """
257        Returns a dictionary with the ABINIT variables that
258        must be used to make the code use this file.
259        """
260        return irdvars_for_ext(self.ext)
261
262
263class GridFsFile(AttrDict):
264    """Information on a file that will stored in the MongoDb gridfs collection."""
265    def __init__(self, path, fs_id=None, mode="b"):
266        super().__init__(path=path, fs_id=fs_id, mode=mode)
267
268
269class NodeResults(dict, MSONable):
270    """Dictionary used to store the most important results produced by a |Node|."""
271    JSON_SCHEMA = {
272        "type": "object",
273        "properties": {
274            "node_id": {"type": "integer", "required": True},
275            "node_finalized": {"type": "boolean", "required": True},
276            "node_history": {"type": "array", "required": True},
277            "node_class": {"type": "string", "required": True},
278            "node_name": {"type": "string", "required": True},
279            "node_status": {"type": "string", "required": True},
280            "in": {"type": "object", "required": True, "description": "dictionary with input parameters"},
281            "out": {"type": "object", "required": True, "description": "dictionary with the output results"},
282            "exceptions": {"type": "array", "required": True},
283            "files": {"type": "object", "required": True},
284        },
285    }
286
287    @classmethod
288    def from_node(cls, node):
289        """Initialize an instance of `NodeResults` from a `Node` subclass."""
290        kwargs = dict(
291            node_id=node.node_id,
292            node_finalized=node.finalized,
293            node_history=list(node.history),
294            node_name=node.name,
295            node_class=node.__class__.__name__,
296            node_status=str(node.status),
297        )
298
299        return node.Results(node, **kwargs)
300
301    def __init__(self, node, **kwargs):
302        super().__init__(**kwargs)
303        self.node = node
304
305        if "in" not in self: self["in"] = Namespace()
306        if "out" not in self: self["out"] = Namespace()
307        if "exceptions" not in self: self["exceptions"] = []
308        if "files" not in self: self["files"] = Namespace()
309
310    @property
311    def exceptions(self):
312        return self["exceptions"]
313
314    @property
315    def gridfs_files(self):
316        """List with the absolute paths of the files to be put in GridFs."""
317        return self["files"]
318
319    def register_gridfs_files(self, **kwargs):
320        """
321        This function registers the files that will be saved in GridFS.
322        kwargs is a dictionary mapping the key associated to the file (usually the extension)
323        to the absolute path. By default, files are assumed to be in binary form, for formatted files
324        one should pass a tuple ("filepath", "t").
325
326        Example::
327
328            results.register_gridfs(GSR="path/to/GSR.nc", text_file=("/path/to/txt_file", "t"))
329
330        The GSR file is a binary file, whereas text_file is a text file.
331        """
332        d = {}
333        for k, v in kwargs.items():
334            mode = "b"
335            if isinstance(v, (list, tuple)): v, mode = v
336            d[k] = GridFsFile(path=v, mode=mode)
337
338        self["files"].update(d)
339        return self
340
341    def push_exceptions(self, *exceptions):
342        for exc in exceptions:
343            newstr = str(exc)
344            if newstr not in self.exceptions:
345                self["exceptions"] += [newstr,]
346
347    @pmg_serialize
348    def as_dict(self):
349        return self.copy()
350
351    @classmethod
352    def from_dict(cls, d):
353        return cls({k: v for k, v in d.items() if k not in ("@module", "@class")})
354
355    def json_dump(self, filename):
356        json_pretty_dump(self.as_dict(), filename)
357
358    @classmethod
359    def json_load(cls, filename):
360        return cls.from_dict(loadfn(filename))
361
362    def validate_json_schema(self):
363        import validictory
364        d = self.as_dict()
365        try:
366            validictory.validate(d, self.JSON_SCHEMA)
367            return True
368        except ValueError as exc:
369            pprint(d)
370            print(exc)
371            return False
372
373    def update_collection(self, collection):
374        """
375        Update a mongodb collection.
376        """
377        node = self.node
378        flow = node if node.is_flow else node.flow
379
380        # Build the key used to store the entry in the document.
381        key = node.name
382        if node.is_task:
383            key = "w" + str(node.pos[0]) + "_t" + str(node.pos[1])
384        elif node.is_work:
385            key = "w" + str(node.pos)
386
387        db = collection.database
388
389        # Save files with GridFs first in order to get the ID.
390        if self.gridfs_files:
391            import gridfs
392            fs = gridfs.GridFS(db)
393            for ext, gridfile in self.gridfs_files.items():
394                logger.info("gridfs: about to put file:", str(gridfile))
395                # Here we set gridfile.fs_id that will be stored in the mondodb document
396                try:
397                    with open(gridfile.path, "r" + gridfile.mode) as f:
398                        gridfile.fs_id = fs.put(f, filename=gridfile.path)
399                except IOError as exc:
400                    logger.critical(str(exc))
401
402        if flow.mongo_id is None:
403            # Flow does not have a mongo_id, allocate doc for the flow and save its id.
404            flow.mongo_id = collection.insert({})
405            print("Creating flow.mongo_id", flow.mongo_id, type(flow.mongo_id))
406
407        # Get the document from flow.mongo_id and update it.
408        doc = collection.find_one({"_id": flow.mongo_id})
409        if key in doc:
410            raise ValueError("%s is already in doc!" % key)
411        doc[key] = self.as_dict()
412
413        collection.save(doc)
414        #collection.update({'_id':mongo_id}, {"$set": doc}, upsert=False)
415
416
417def check_spectator(node_method):
418    """
419    Decorator for |Node| methods. Raise `SpectatorNodeError`.
420    """
421    from functools import wraps
422    @wraps(node_method)
423    def wrapper(*args, **kwargs):
424        node = args[0]
425        if node.in_spectator_mode:
426            #raise node.SpectatorError("You should not call this method when the node in spectator_mode")
427            import warnings
428            warnings.warn("You should not call %s when the node in spectator_mode" % str(node_method))
429
430        return node_method(*args, **kwargs)
431
432    return wrapper
433
434
435class NodeError(Exception):
436    """Base Exception raised by |Node| subclasses"""
437
438
439class SpectatorNodeError(NodeError):
440    """
441    Exception raised by |Node| methods when the node is in spectator mode
442    and we are calling a method with side effects.
443    """
444
445
446class Node(metaclass=abc.ABCMeta):
447    """
448    Abstract base class defining the interface that must be
449    implemented by the nodes of the calculation.
450
451    Nodes are hashable and can be tested for equality
452    """
453    Results = NodeResults
454
455    Error = NodeError
456    SpectatorError = SpectatorNodeError
457
458    # Possible status of the node.
459    S_INIT = Status.from_string("Initialized")
460    S_LOCKED = Status.from_string("Locked")
461    S_READY = Status.from_string("Ready")
462    S_SUB = Status.from_string("Submitted")
463    S_RUN = Status.from_string("Running")
464    S_DONE = Status.from_string("Done")
465    S_ABICRITICAL = Status.from_string("AbiCritical")
466    S_QCRITICAL = Status.from_string("QCritical")
467    S_UNCONVERGED = Status.from_string("Unconverged")
468    #S_CANCELLED = Status.from_string("Cancelled")
469    S_ERROR = Status.from_string("Error")
470    S_OK = Status.from_string("Completed")
471
472    ALL_STATUS = [
473        S_INIT,
474        S_LOCKED,
475        S_READY,
476        S_SUB,
477        S_RUN,
478        S_DONE,
479        S_ABICRITICAL,
480        S_QCRITICAL,
481        S_UNCONVERGED,
482        #S_CANCELLED,
483        S_ERROR,
484        S_OK,
485    ]
486
487    # Color used to plot the network in networkx
488    color_rgb = np.array((105, 105, 105)) / 255
489
490    def __init__(self):
491        self._in_spectator_mode = False
492        # Node identifier.
493        self._node_id = get_newnode_id()
494
495        # List of dependencies
496        self._deps = []
497
498        # List of files (products) needed by this node.
499        self._required_files = []
500
501        # Used to push additional info during the execution.
502        self.history = NodeHistory(maxlen=80)
503
504        # Actions performed to fix abicritical events.
505        self._corrections = NodeCorrections()
506
507        # Set to true if the node has been finalized.
508        self._finalized = False
509        self._status = self.S_INIT
510
511    def __eq__(self, other):
512        if not isinstance(other, Node): return False
513        return self.node_id == other.node_id
514
515    def __ne__(self, other):
516        return not self.__eq__(other)
517
518    def __hash__(self):
519        return hash(self.node_id)
520
521    def __repr__(self):
522        try:
523            return "<%s, node_id=%s, workdir=%s>" % (
524                self.__class__.__name__, self.node_id, self.relworkdir)
525        except AttributeError:
526            # this usually happens when workdir has not been initialized
527            return "<%s, node_id=%s, workdir=None>" % (self.__class__.__name__, self.node_id)
528
529    #def __setattr__(self, name, value):
530    #    if self.in_spectator_mode:
531    #        raise RuntimeError("You should not call __setattr__ in spectator_mode")
532    #    return super().__setattr__(name,value)
533
534    @lazy_property
535    def color_hex(self):
536        """Node color as Hex Triplet https://en.wikipedia.org/wiki/Web_colors#Hex_triplet"""
537        def clamp(x):
538            return max(0, min(int(x), 255))
539
540        r, g, b = np.trunc(self.color_rgb * 255)
541        return "#{0:02x}{1:02x}{2:02x}".format(clamp(r), clamp(g), clamp(b))
542
543    def isinstance(self, class_or_string):
544        """
545        Check whether the node is a instance of `class_or_string`.
546        Unlinke the standard isinstance builtin, the method accepts either a class or a string.
547        In the later case, the string is compared with self.__class__.__name__ (case insensitive).
548        """
549        if class_or_string is None:
550            return False
551        import inspect
552        if inspect.isclass(class_or_string):
553            return isinstance(self, class_or_string)
554        else:
555            return self.__class__.__name__.lower() == class_or_string.lower()
556
557    @classmethod
558    def as_node(cls, obj):
559        """
560        Convert obj into a Node instance.
561
562        Return:
563            obj if obj is a Node instance,
564            cast obj to :class:`FileNode` instance of obj is a string.
565            None if obj is None
566        """
567        if isinstance(obj, cls):
568            return obj
569        elif is_string(obj):
570            # Assume filepath.
571            return FileNode(obj)
572        elif obj is None:
573            return obj
574        else:
575            raise TypeError("Don't know how to convert %s to Node instance." % obj)
576
577    @property
578    def name(self):
579        """
580        The name of the node
581        (only used for facilitating its identification in the user interface).
582        """
583        try:
584            return self._name
585        except AttributeError:
586            if self.is_task:
587                try:
588                    return self.pos_str
589                except Exception:
590                    return os.path.basename(self.workdir)
591            else:
592                return os.path.basename(self.workdir)
593
594    @property
595    def relworkdir(self):
596        """Return a relative version of the workdir"""
597        if getattr(self, "workdir", None) is None:
598            return None
599        try:
600            return os.path.relpath(self.workdir)
601        except OSError:
602            # current working directory may not be defined!
603            return self.workdir
604
605    def set_name(self, name):
606        """Set the name of the Node."""
607        self._name = name
608
609    @property
610    def node_id(self):
611        """Node identifier."""
612        return self._node_id
613
614    @check_spectator
615    def set_node_id(self, node_id):
616        """Set the node identifier. Use it carefully!"""
617        self._node_id = node_id
618
619    @property
620    def finalized(self):
621        """True if the `Node` has been finalized."""
622        return self._finalized
623
624    @finalized.setter
625    def finalized(self, boolean):
626        self._finalized = boolean
627        self.history.info("Finalized set to %s" % self._finalized)
628
629    @property
630    def in_spectator_mode(self):
631        return self._in_spectator_mode
632
633    @in_spectator_mode.setter
634    def in_spectator_mode(self, mode):
635        self._in_spectator_mode = bool(mode)
636        #self.history.info("in_spectator_mode set to %s" % mode)
637
638    @property
639    def corrections(self):
640        """
641        List of dictionaries with infornation on the actions performed to solve `AbiCritical` Events.
642        Each dictionary contains the `AbinitEvent` who triggered the correction and
643        a human-readable message with the description of the operation performed.
644        """
645        return self._corrections
646
647    @property
648    def num_corrections(self):
649        return len(self.corrections)
650
651    def log_correction(self, event, action):
652        """
653        This method should be called once we have fixed the problem associated to this event.
654        It adds a new entry in the correction history of the node.
655
656        Args:
657            event: :class:`AbinitEvent` that triggered the correction.
658            action (str): Human-readable string with info on the action perfomed to solve the problem.
659        """
660        # TODO: Create CorrectionObject
661        action = str(action)
662        self.history.info(action)
663
664        self._corrections.append(dict(
665            event=event.as_dict(),
666            action=action,
667        ))
668
669    @property
670    def is_file(self):
671        """True if this node is a file"""
672        return isinstance(self, FileNode)
673
674    @property
675    def is_task(self):
676        """True if this node is a Task"""
677        from .tasks import Task
678        return isinstance(self, Task)
679
680    @property
681    def is_work(self):
682        """True if this node is a Work"""
683        from .works import Work
684        return isinstance(self, Work)
685
686    @property
687    def is_flow(self):
688        """True if this node is a Flow"""
689        from .flows import Flow
690        return isinstance(self, Flow)
691
692    @property
693    def deps(self):
694        """
695        List of :class:`Dependency` objects defining the dependencies
696        of this `Node`. Empty list if this |Node| does not have dependencies.
697        """
698        return self._deps
699
700    @check_spectator
701    def add_deps(self, deps):
702        """
703        Add a list of dependencies to the |Node|.
704
705        Args:
706            deps: List of :class:`Dependency` objects specifying the dependencies of the node.
707                  or dictionary mapping nodes to file extensions e.g. {task: "DEN"}
708        """
709        if isinstance(deps, collections.abc.Mapping):
710            # Convert dictionary into list of dependencies.
711            deps = [Dependency(node, exts) for node, exts in deps.items()]
712
713        # We want a list
714        if not isinstance(deps, (list, tuple)):
715            deps = [deps]
716
717        assert all(isinstance(d, Dependency) for d in deps)
718
719        # Add the dependencies to the node and merge possibly duplicated keys.
720        self._deps.extend(deps)
721        self.merge_deps()
722
723        if self.is_work:
724            # The task in the work should inherit the same dependency.
725            for task in self:
726                task.add_deps(deps)
727                task.merge_deps()
728
729        # If we have a FileNode as dependency, add self to its children
730        # Node.get_parents will use this list if node.is_isfile.
731        for dep in (d for d in deps if d.node.is_file):
732            dep.node.add_filechild(self)
733
734    def merge_deps(self):
735        """
736        Group all extensions associated to the same node in a single list.
737        Useful for cases in which we may end up with the same node appearing more than once
738        in self.deps. See e.g. ``add_deps``.
739        """
740        from collections import defaultdict
741        node2exts = defaultdict(list)
742        for dep in self.deps:
743            node2exts[dep.node].extend(dep.exts)
744        self._deps = [Dependency(node, exts) for node, exts in node2exts.items()]
745
746    @check_spectator
747    def remove_deps(self, deps):
748        """
749        Remove a list of dependencies from the |Node|.
750
751        Args:
752            deps: List of :class:`Dependency` objects specifying the  dependencies of the node.
753        """
754        if not isinstance(deps, (list, tuple)):
755            deps = [deps]
756
757        assert all(isinstance(d, Dependency) for d in deps)
758
759        self._deps = [d for d in self._deps if d not in deps]
760
761        if self.is_work:
762            # remove the same list of dependencies from the task in the work
763            for task in self:
764                task.remove_deps(deps)
765
766    @property
767    def deps_status(self):
768        """Returns a list with the status of the dependencies."""
769        if not self.deps:
770            return [self.S_OK]
771
772        return [d.status for d in self.deps]
773
774    def depends_on(self, other):
775        """True if this node depends on the other node."""
776        return other in [d.node for d in self.deps]
777
778    def find_parent_with_ext(self, ext):
779        """
780        Return the parent (usually a |Task|) that produces the file with extension `ext`.
781        Raises ValueError if multiple parents are found.
782        Return None if no parent is found.
783        """
784        parent, count = None, 0
785        for dep in self.deps:
786            if ext in dep.exts:
787                parent = dep.node
788                count += 1
789
790        if count > 1:
791            raise ValueError("Cannot have multiple parents producing the same file extension!\n%s" % self.str_deps())
792
793        return parent
794
795    def get_parents(self):
796        """Return the list of nodes in the |Flow| required by this |Node|"""
797        return [d.node for d in self.deps]
798
799    def get_children(self):
800        """
801        Return the list of nodes in the |Flow| that depends on this |Node|
802
803        .. note::
804
805            This routine assumes the entire flow has been allocated.
806        """
807        # Specialized branch for FileNode.
808        if self.is_file:
809            return self.filechildren
810
811        # Inspect the entire flow to get children.
812        children = []
813        for work in self.flow:
814            if work.depends_on(self): children.append(work)
815            for task in work:
816                if task.depends_on(self): children.append(task)
817        return children
818
819    def str_deps(self):
820        """Return the string representation of the dependencies of the node."""
821        lines = []
822        app = lines.append
823
824        app("Dependencies of node %s:" % str(self))
825        for i, dep in enumerate(self.deps):
826            app("%d) %s, status=%s" % (i, dep.info, str(dep.status)))
827
828        return "\n".join(lines)
829
830    def get_vars_dataframe(self, *varnames):
831        """
832        Return pandas DataFrame with the value of the variables specified in `varnames`.
833        Can be used for task/works/flow. It's recursive!
834
835        .. example:
836
837            flow.get_vars_dataframe("ecut", "ngkpt")
838            work.get_vars_dataframe("acell", "usepawu")
839        """
840        import pandas as pd
841        if self.is_task:
842            df = pd.DataFrame([{v: self.input.get(v, None) for v in varnames}], index=[self.name], columns=varnames)
843            df["class"] = self.__class__.__name__
844            return df
845
846        elif self.is_work:
847            frames = [task.get_vars_dataframe(*varnames) for task in self]
848            return pd.concat(frames)
849
850        elif self.is_flow:
851            frames = [work.get_vars_dataframe(*varnames) for work in self]
852            return pd.concat(frames)
853
854        else:
855            #print("Ignoring node of type: `%s`" % type(self))
856            return pd.DataFrame(index=[self.name])
857
858    def get_graphviz_dirtree(self, engine="automatic", **kwargs):
859        """
860        Generate directory graph in the DOT language. The graph show the files and directories
861        in the node workdir.
862
863        Returns: graphviz.Digraph <https://graphviz.readthedocs.io/en/stable/api.html#digraph>
864        """
865        if engine == "automatic":
866            engine = "fdp"
867
868        return Dirviz(self.workdir).get_cluster_graph(engine=engine, **kwargs)
869
870    def set_gc(self, gc):
871        """
872        Set the garbage collector.
873        """
874        assert isinstance(gc, GarbageCollector)
875        self._gc = gc
876
877    @property
878    def gc(self):
879        """
880        Garbage collector. None if garbage collection is deactivated.
881        Use flow.set_garbage_collector to initialize the object.
882        """
883        try:
884            return self._gc
885        except AttributeError:
886            #if not self.is_flow and self.flow.gc: return self.flow.gc
887            return None
888
889    @property
890    def event_handlers(self):
891        """
892        The list of handlers registered for this node.
893        If the node is not a `Flow` and does not have its own list of
894        `handlers` the handlers registered at the level of the flow are returned.
895
896        This trick allows one to registered different handlers at the level of the Task
897        for testing purposes. By default, we have a common list of handlers for all the nodes in the flow.
898        This choice facilitates the automatic installation of the handlers when we use callbacks to generate
899        new Works and Tasks!
900        """
901        if self.is_flow:
902            return self._event_handlers
903
904        try:
905            return self._event_handlers
906        except AttributeError:
907            return self.flow._event_handlers
908
909    @check_spectator
910    def install_event_handlers(self, categories=None, handlers=None):
911        """
912        Install the `EventHandlers for this `Node`. If no argument is provided
913        the default list of handlers is installed.
914
915        Args:
916            categories: List of categories to install e.g. base + can_change_physics
917            handlers: explicit list of :class:`EventHandler` instances.
918                      This is the most flexible way to install handlers.
919
920        .. note::
921
922            categories and handlers are mutually exclusive.
923        """
924        if categories is not None and handlers is not None:
925            raise ValueError("categories and handlers are mutually exclusive!")
926
927        from .events import get_event_handler_classes
928        if categories:
929            raise NotImplementedError()
930            handlers = [cls() for cls in get_event_handler_classes(categories=categories)]
931        else:
932            handlers = handlers or [cls() for cls in get_event_handler_classes()]
933
934        self._event_handlers = handlers
935
936    def show_event_handlers(self, stream=sys.stdout, verbose=0):
937        """Print to `stream` the event handlers installed for this flow."""
938        lines = ["List of event handlers installed:"]
939        for handler in self.event_handlers:
940            if verbose:
941                lines.extend(handler.__class__.cls2str().split("\n"))
942            else:
943                lines.extend(str(handler).split("\n"))
944
945        stream.write("\n".join(lines))
946        stream.write("\n")
947
948    def send_signal(self, signal):
949        """
950        Send signal from this node to all connected receivers unless the node is in spectator mode.
951
952        signal -- (hashable) signal value, see `dispatcher` connect for details
953
954        Return a list of tuple pairs [(receiver, response), ... ]
955        or None if the node is in spectator mode.
956
957        if any receiver raises an error, the error propagates back
958        through send, terminating the dispatch loop, so it is quite
959        possible to not have all receivers called if a raises an error.
960        """
961        if self.in_spectator_mode: return None
962        self.history.debug("Node %s broadcasts signal %s" % (self, signal))
963        dispatcher.send(signal=signal, sender=self)
964
965    ##########################
966    ### Abstract protocol ####
967    ##########################
968
969    @property
970    @abc.abstractmethod
971    def status(self):
972        """The status of the `Node`."""
973
974    @abc.abstractmethod
975    def check_status(self):
976        """Check the status of the `Node`."""
977
978
979class FileNode(Node):
980    """
981    A Node that consists of a file. May be not yet existing
982
983    Mainly used to connect |Task| objects to external files produced in previous runs.
984    """
985    color_rgb = np.array((102, 51, 255)) / 255
986
987    def __init__(self, filename):
988        super().__init__()
989        self.filepath = os.path.abspath(filename)
990
991        # Directories with input|output|temporary data.
992        self.workdir = os.path.dirname(self.filepath)
993
994        self.indir = Directory(self.workdir)
995        self.outdir = Directory(self.workdir)
996        self.tmpdir = Directory(self.workdir)
997
998        self._filechildren = []
999
1000    def __repr__(self):
1001        try:
1002            return "<%s, node_id=%s, rpath=%s>" % (
1003                self.__class__.__name__, self.node_id, os.path.relpath(self.filepath))
1004        except AttributeError:
1005            # this usually happens when workdir has not been initialized
1006            return "<%s, node_id=%s, path=%s>" % (self.__class__.__name__, self.node_id, self.filepath)
1007
1008    @lazy_property
1009    def basename(self):
1010        """Basename of the file."""
1011        return os.path.basename(self.filepath)
1012
1013    @property
1014    def products(self):
1015        return [Product.from_file(self.filepath)]
1016
1017    def opath_from_ext(self, ext):
1018        return self.filepath
1019
1020    @property
1021    def status(self):
1022        return self.S_OK if os.path.exists(self.filepath) else self.S_ERROR
1023
1024    def check_status(self):
1025        return self.status
1026
1027    def get_results(self, **kwargs):
1028        results = super().get_results(**kwargs)
1029        #results.register_gridfs_files(filepath=self.filepath)
1030        return results
1031
1032    def add_filechild(self, node):
1033        """Add a node (usually Task) to the children of this FileNode."""
1034        self._filechildren.append(node)
1035
1036    @property
1037    def filechildren(self):
1038        """List with the children (nodes) of this FileNode."""
1039        return self._filechildren
1040
1041    # This part provides IO capabilities to FileNode with API similar to the one implemented in Task.
1042    # We may need it at runtime to extract information from netcdf files e.g.
1043    # a NscfTask will change the FFT grid to match the one used in the GsTask.
1044
1045    def abiopen(self):
1046        from abipy import abilab
1047        return abilab.abiopen(self.filepath)
1048
1049    def open_gsr(self):
1050        return self._abiopen_abiext("_GSR.nc")
1051
1052    def _abiopen_abiext(self, abiext):
1053        import glob
1054        from abipy import abilab
1055        if not self.filepath.endswith(abiext):
1056            msg = """\n
1057File type does not match the abinit file extension.
1058Caller asked for abiext: `%s` whereas filepath: `%s`.
1059Continuing anyway assuming that the netcdf file provides the API/dims/vars neeeded by the caller.
1060""" % (abiext, self.filepath)
1061            self.history.warning(msg)
1062
1063        #try to find file in the same path
1064        filepath = os.path.dirname(self.filepath)
1065        glob_result = glob.glob(os.path.join(filepath, "*%s" % abiext))
1066        if len(glob_result): return abilab.abiopen(glob_result[0])
1067        return self.abiopen()
1068
1069
1070class HistoryRecord(object):
1071    """
1072    A `HistoryRecord` instance represents an entry in the :class:`NodeHistory`.
1073
1074    `HistoryRecord` instances are created every time something is logged.
1075    They contain all the information pertinent to the event being logged.
1076    The main information passed in is in msg and args, which are combined
1077    using str(msg) % args to create the message field of the record.
1078    The record also includes information such as when the record was created,
1079    the source line where the logging call was made
1080
1081    .. attribute:: levelno
1082
1083        Numeric logging level for the message (DEBUG, INFO, WARNING, ERROR, CRITICAL)
1084
1085    .. attribute:: levelname
1086
1087        Text logging level for the message ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL")
1088
1089    .. attribute:: pathname
1090
1091        Full pathname of the source file where the logging call was issued (if available)
1092
1093    .. attribute:: filename
1094
1095        Filename portion of pathname
1096
1097    .. attribute:: module
1098
1099        Module (name portion of filename)
1100
1101    .. attribute:: lineno
1102
1103        Source line number where the logging call was issued (if available)
1104
1105    .. attribute:: func_name
1106
1107        Function name
1108
1109    .. attribute:: created
1110
1111        Time when the HistoryRecord was created (time.time() return value)
1112
1113    .. attribute:: asctime
1114
1115        Textual time when the HistoryRecord was created
1116
1117    .. attribute:: message
1118        The result of record.getMessage(), computed just as the record is emitted
1119    """
1120    def __init__(self, level, pathname, lineno, msg, args, exc_info, func=None):
1121        """
1122        Initialize a logging record with interesting information.
1123        """
1124        #
1125        # The following statement allows passing of a dictionary as a sole
1126        # argument, so that you can do something like
1127        #  logging.debug("a %(a)d b %(b)s", {'a':1, 'b':2})
1128        # Suggested by Stefan Behnel.
1129        # Note that without the test for args[0], we get a problem because
1130        # during formatting, we test to see if the arg is present using
1131        # 'if self.args:'. If the event being logged is e.g. 'Value is %d'
1132        # and if the passed arg fails 'if self.args:' then no formatting
1133        # is done. For example, logger.warn('Value is %d', 0) would log
1134        # 'Value is %d' instead of 'Value is 0'.
1135        # For the use case of passing a dictionary, this should not be a problem.
1136        if args and len(args) == 1 and isinstance(args[0], dict) and args[0]:
1137            args = args[0]
1138        self.args = args
1139        self.levelno = level
1140        self.pathname = pathname
1141        self.msg = msg
1142
1143        self.levelname = "FOOBAR" #getLevelName(level)
1144
1145        try:
1146            self.filename = os.path.basename(pathname)
1147            self.module = os.path.splitext(self.filename)[0]
1148        except (TypeError, ValueError, AttributeError):
1149            self.filename = pathname
1150            self.module = "Unknown module"
1151
1152        self.exc_info = exc_info
1153        self.exc_text = None      # used to cache the traceback text
1154        self.lineno = lineno
1155        self.func_name = func
1156        self.created = time.time()
1157        self.asctime = time.asctime()
1158        # Remove milliseconds
1159        i = self.asctime.find(".")
1160        if i != -1: self.asctime = self.asctime[:i]
1161
1162    def __repr__(self):
1163        return '<%s, %s, %s, %s,\n"%s">' % (self.__class__.__name__, self.levelno, self.pathname, self.lineno, self.msg)
1164
1165    def __str__(self):
1166        return self.get_message(metadata=False)
1167
1168    def get_message(self, metadata=False, asctime=True):
1169        """
1170        Return the message after merging any user-supplied arguments with the message.
1171
1172        Args:
1173            metadata: True if function and module name should be added.
1174            asctime: True if time string should be added.
1175        """
1176        msg = self.msg if is_string(self.msg) else str(self.msg)
1177        if self.args:
1178            try:
1179                msg = msg % self.args
1180            except Exception:
1181                msg += str(self.args)
1182
1183        if asctime: msg = "[" + self.asctime + "] " + msg
1184
1185        # Add metadata
1186        if metadata:
1187            msg += "\nCalled by %s at %s:%s\n" % (self.func_name, self.pathname, self.lineno)
1188
1189        return msg
1190
1191    @pmg_serialize
1192    def as_dict(self):
1193        return {'level': self.levelno, 'pathname': self.pathname, 'lineno': self.lineno, 'msg': self.msg,
1194                'args': self.args, 'exc_info': self.exc_info, 'func': self.func_name}
1195
1196    @classmethod
1197    def from_dict(cls, d):
1198        return cls(level=d['level'], pathname=d['pathname'], lineno=int(d['lineno']), msg=d['msg'], args=d['args'],
1199                   exc_info=d['exc_info'], func=d['func'])
1200
1201
1202class NodeHistory(collections.deque):
1203    """Logger-like object"""
1204
1205    def __str__(self):
1206        return self.to_string()
1207
1208    def to_string(self, metadata=False):
1209        """Returns  a string with the history. Set metadata to True to have info on function and module."""
1210        return "\n".join(rec.get_message(metadata=metadata) for rec in self)
1211
1212    def info(self, msg, *args, **kwargs):
1213        """Log 'msg % args' with the info severity level"""
1214        self._log("INFO", msg, args, kwargs)
1215
1216    def warning(self, msg, *args, **kwargs):
1217        """Log 'msg % args' with the warning severity level"""
1218        self._log("WARNING", msg, args, kwargs)
1219
1220    def critical(self, msg, *args, **kwargs):
1221        """Log 'msg % args' with the critical severity level"""
1222        self._log("CRITICAL", msg, args, kwargs)
1223
1224    def debug(self, msg, *args, **kwargs):
1225        """Log 'msg % args' with the critical severity level"""
1226        self._log("DEBUG", msg, args, kwargs)
1227
1228    def _log(self, level, msg, args, exc_info=None, extra=None):
1229        """Low-level logging routine which creates a :class:`HistoryRecord`."""
1230        if exc_info and not isinstance(exc_info, tuple):
1231            exc_info = sys.exc_info()
1232
1233        self.append(HistoryRecord(level, "unknown filename", 0, msg, args, exc_info, func="unknown func"))
1234
1235
1236class NodeCorrections(list):
1237    """Iterable storing the correctios performed by the :class:`EventHandler`"""
1238    #TODO
1239    # Correction should have a human-readable message
1240    # and a list of operatins in JSON format (Modder?) so that
1241    # we can read them and re-apply the corrections to another task if needed.
1242
1243    #def count_event_class(self, event_class):
1244    #    """
1245    #    Return the number of times the event class has been already fixed.
1246    #    """
1247    #    #return len([c for c in self if c["event"]["@class"] == str(event_class)])
1248
1249    #def _find(self, event_class)
1250
1251
1252class GarbageCollector(object):
1253    """This object stores information on the """
1254    def __init__(self, exts, policy):
1255        self.exts, self.policy = set(exts), policy
1256
1257
1258# The code below initializes a counter from a file when the module is imported
1259# and save the counter's updated value automatically when the program terminates
1260# without relying on the application making an explicit call into this module at termination.
1261
1262_COUNTER = None
1263_COUNTER_FILE = os.path.join(os.path.expanduser("~"), ".abinit", "abipy", "nodecounter")
1264
1265
1266def init_counter():
1267    global _COUNTER
1268
1269    # Make dir and file if not present.
1270    if not os.path.exists(os.path.dirname(_COUNTER_FILE)):
1271        os.makedirs(os.path.dirname(_COUNTER_FILE))
1272
1273    if not os.path.exists(_COUNTER_FILE):
1274        with open(_COUNTER_FILE, "wt") as fh:
1275            fh.write("%d\n" % -1)
1276
1277    if _COUNTER is None:
1278        with open(_COUNTER_FILE, "r") as fh:
1279            s = fh.read().strip()
1280            if not s: s = "-1"
1281            _COUNTER = int(s)
1282
1283
1284def get_newnode_id():
1285    """
1286    Returns a new node identifier used for |Task|, |Work| and |Flow| objects.
1287
1288    .. warning:
1289
1290        The id is unique inside the same python process so be careful when
1291        Works and Tasks are constructed at run-time or when threads are used.
1292    """
1293    init_counter()
1294
1295    global _COUNTER
1296    _COUNTER += 1
1297    return _COUNTER
1298
1299
1300def save_lastnode_id():
1301    """Save the id of the last node created."""
1302    init_counter()
1303
1304    with FileLock(_COUNTER_FILE):
1305        with AtomicFile(_COUNTER_FILE, mode="w") as fh:
1306            fh.write("%d\n" % _COUNTER)
1307
1308
1309# Register function atexit
1310import atexit
1311atexit.register(save_lastnode_id)
1312