1# coding: utf-8 2""" 3This module defines the Node class that is inherited by Task, Work and Flow objects. 4""" 5 6import sys 7import os 8import time 9import collections 10import abc 11import numpy as np 12 13from collections import OrderedDict 14from pprint import pprint 15from pymatgen.util.io_utils import AtomicFile 16from pydispatch import dispatcher 17from monty.termcolor import colored 18from monty.serialization import loadfn 19from monty.string import is_string 20from monty.io import FileLock 21from monty.collections import AttrDict, Namespace 22from monty.functools import lazy_property 23from monty.json import MSONable 24from pymatgen.util.serialization import json_pretty_dump, pmg_serialize 25from .utils import File, Directory, Dirviz, irdvars_for_ext, abi_extensions 26 27 28import logging 29logger = logging.getLogger(__name__) 30 31 32def _2attrs(item): 33 return item if item is None or isinstance(list, tuple) else (item,) 34 35 36class Status(int): 37 """This object is an integer representing the status of the `Node`.""" 38 39 # Possible status of the node. See monty.termocolor for the meaning of color, on_color and attrs. 40 _STATUS_INFO = [ 41 # (value, name, color, on_color, attrs) 42 # Node has been initialized 43 (1, "Initialized", None, None, None), 44 # Task is locked an must be explicitly unlocked by an external subject (Work). 45 (2, "Locked", "grey", None, None), 46 # Node is ready i.e. all the depencies of the node have status S_OK 47 (3, "Ready", None, None, None), 48 # Node has been submitted (The `Task` is running or we have started to finalize the Work) 49 (4, "Submitted", "blue", None, None), 50 # Node is running. 51 (5, "Running", "magenta", None, None), 52 # Node done, This does not imply that results are ok or that the calculation completed successfully 53 (6, "Done", None, None, None), 54 # raised an Error by ABINIT. 55 (7, "AbiCritical", "red", None, None), 56 # Node raised an Error by submitting submission script, or by executing it 57 (8, "QCritical", "red", "on_white", None), 58 # This usually means that an iterative algorithm didn't converge. 59 (9, "Unconverged", "red", "on_yellow", None), 60 # Node raised an unrecoverable error, usually raised when an attempt to fix one of other types failed. 61 (10, "Error", "red", None, None), 62 # Execution completed successfully. 63 (11, "Completed", "green", None, None), 64 ] 65 _STATUS2STR = OrderedDict([(t[0], t[1]) for t in _STATUS_INFO]) 66 _STATUS2COLOR_OPTS = OrderedDict([(t[0], {"color": t[2], "on_color": t[3], "attrs": _2attrs(t[4])}) for t in _STATUS_INFO]) 67 68 def __repr__(self): 69 return "<%s: %s, at %s>" % (self.__class__.__name__, str(self), id(self)) 70 71 def __str__(self): 72 """String representation.""" 73 return self._STATUS2STR[self] 74 75 @classmethod 76 def as_status(cls, obj): 77 """Convert obj into Status.""" 78 if obj is None: return None 79 return obj if isinstance(obj, cls) else cls.from_string(obj) 80 81 @classmethod 82 def from_string(cls, s): 83 """Return a `Status` instance from its string representation.""" 84 for num, text in cls._STATUS2STR.items(): 85 if text == s: 86 return cls(num) 87 else: 88 raise ValueError("Wrong string %s" % s) 89 90 @classmethod 91 def all_status_strings(cls): 92 """List of strings with all possible values status.""" 93 return [info[1] for info in cls._STATUS_INFO] 94 95 @property 96 def is_critical(self): 97 """True if status is critical.""" 98 return str(self) in ("AbiCritical", "QCritical", "Unconverged", "Error") 99 100 @property 101 def color_opts(self): 102 return self._STATUS2COLOR_OPTS[self] 103 104 @property 105 def colored(self): 106 """Return colorized text used to print the status if the stream supports it.""" 107 return colored(str(self), **self.color_opts) 108 109 110class Dependency(object): 111 """ 112 This object describes the dependencies among the nodes of a calculation. 113 114 A `Dependency` consists of a `Node` that produces a list of products (files) 115 that are used by the other nodes (`Task` or `Work`) to start the calculation. 116 One usually creates the object by calling work.register 117 118 Example: 119 120 # Register the SCF task in work. 121 scf_task = work.register(scf_strategy) 122 123 # Register the NSCF calculation and its dependency on the SCF run via deps. 124 nscf_task = work.register(nscf_strategy, deps={scf_task: "DEN"}) 125 """ 126 def __init__(self, node, exts=None): 127 """ 128 Args: 129 node: The task or the worfklow associated to the dependency or string with a filepath. 130 exts: Extensions of the output files that are needed for running the other tasks. 131 """ 132 self._node = Node.as_node(node) 133 134 if exts and is_string(exts): exts = exts.split() 135 136 # Extract extensions. 137 self.exts = [e for e in exts if not e.startswith("@")] 138 139 # Save getters 140 self.getters = [e for e in exts if e.startswith("@")] 141 #if self.getters: print(self.getters) 142 143 def __hash__(self): 144 return hash(self._node) 145 146 def __repr__(self): 147 return "node %s will produce: %s " % (repr(self.node), repr(self.exts)) 148 149 def __str__(self): 150 return "node %s will produce: %s " % (str(self.node), str(self.exts)) 151 152 @property 153 def info(self): 154 return str(self.node) 155 156 @property 157 def node(self): 158 """The |Node| associated to the dependency.""" 159 return self._node 160 161 @property 162 def status(self): 163 """The status of the dependency, i.e. the status of the |Node|.""" 164 return self.node.status 165 166 @lazy_property 167 def products(self): 168 """List of output files produces by self.""" 169 _products = [] 170 for ext in self.exts: 171 prod = Product(ext, self.node.opath_from_ext(ext)) 172 _products.append(prod) 173 174 return _products 175 176 def apply_getters(self, task): 177 """ 178 This function is called when we specify the task dependencies with the syntax: 179 180 deps={node: "@property"} 181 182 In this case the task has to the get `property` from `node` before starting the calculation. 183 184 At present, the following properties are supported: 185 186 - @structure 187 """ 188 if not self.getters: return 189 190 for getter in self.getters: 191 if getter == "@structure": 192 task.history.info("Getting structure from %s" % self.node) 193 new_structure = self.node.get_final_structure() 194 task._change_structure(new_structure) 195 else: 196 raise ValueError("Wrong getter %s" % getter) 197 198 def connecting_vars(self): 199 """ 200 Returns a dictionary with the variables that must be added to the 201 input file in order to connect this |Node| to its dependencies. 202 """ 203 vars = {} 204 for prod in self.products: 205 vars.update(prod.connecting_vars()) 206 207 return vars 208 209 def get_filepaths_and_exts(self): 210 """Returns the paths of the output files produced by self and its extensions""" 211 filepaths = [prod.filepath for prod in self.products] 212 exts = [prod.ext for prod in self.products] 213 214 return filepaths, exts 215 216 217class Product(object): 218 """ 219 A product represents an output file produced by ABINIT instance. 220 This file is needed to start another `Task` or another `Work`. 221 """ 222 def __init__(self, ext, path): 223 """ 224 Args: 225 ext: ABINIT file extension 226 path: (asbolute) filepath 227 """ 228 if ext not in abi_extensions(): 229 raise ValueError("Extension `%s` has not been registered in the internal database" % str(ext)) 230 231 self.ext = ext 232 self.file = File(path) 233 234 @classmethod 235 def from_file(cls, filepath): 236 """Build a :class:`Product` instance from a filepath.""" 237 # Find the abinit extension. 238 for i in range(len(filepath)): 239 if filepath[i:] in abi_extensions(): 240 ext = filepath[i:] 241 break 242 else: 243 raise ValueError("Cannot detect abinit extension in %s" % filepath) 244 245 return cls(ext, filepath) 246 247 def __str__(self): 248 return "File=%s, Extension=%s, " % (self.file.path, self.ext) 249 250 @property 251 def filepath(self): 252 """Absolute path of the file.""" 253 return self.file.path 254 255 def connecting_vars(self): 256 """ 257 Returns a dictionary with the ABINIT variables that 258 must be used to make the code use this file. 259 """ 260 return irdvars_for_ext(self.ext) 261 262 263class GridFsFile(AttrDict): 264 """Information on a file that will stored in the MongoDb gridfs collection.""" 265 def __init__(self, path, fs_id=None, mode="b"): 266 super().__init__(path=path, fs_id=fs_id, mode=mode) 267 268 269class NodeResults(dict, MSONable): 270 """Dictionary used to store the most important results produced by a |Node|.""" 271 JSON_SCHEMA = { 272 "type": "object", 273 "properties": { 274 "node_id": {"type": "integer", "required": True}, 275 "node_finalized": {"type": "boolean", "required": True}, 276 "node_history": {"type": "array", "required": True}, 277 "node_class": {"type": "string", "required": True}, 278 "node_name": {"type": "string", "required": True}, 279 "node_status": {"type": "string", "required": True}, 280 "in": {"type": "object", "required": True, "description": "dictionary with input parameters"}, 281 "out": {"type": "object", "required": True, "description": "dictionary with the output results"}, 282 "exceptions": {"type": "array", "required": True}, 283 "files": {"type": "object", "required": True}, 284 }, 285 } 286 287 @classmethod 288 def from_node(cls, node): 289 """Initialize an instance of `NodeResults` from a `Node` subclass.""" 290 kwargs = dict( 291 node_id=node.node_id, 292 node_finalized=node.finalized, 293 node_history=list(node.history), 294 node_name=node.name, 295 node_class=node.__class__.__name__, 296 node_status=str(node.status), 297 ) 298 299 return node.Results(node, **kwargs) 300 301 def __init__(self, node, **kwargs): 302 super().__init__(**kwargs) 303 self.node = node 304 305 if "in" not in self: self["in"] = Namespace() 306 if "out" not in self: self["out"] = Namespace() 307 if "exceptions" not in self: self["exceptions"] = [] 308 if "files" not in self: self["files"] = Namespace() 309 310 @property 311 def exceptions(self): 312 return self["exceptions"] 313 314 @property 315 def gridfs_files(self): 316 """List with the absolute paths of the files to be put in GridFs.""" 317 return self["files"] 318 319 def register_gridfs_files(self, **kwargs): 320 """ 321 This function registers the files that will be saved in GridFS. 322 kwargs is a dictionary mapping the key associated to the file (usually the extension) 323 to the absolute path. By default, files are assumed to be in binary form, for formatted files 324 one should pass a tuple ("filepath", "t"). 325 326 Example:: 327 328 results.register_gridfs(GSR="path/to/GSR.nc", text_file=("/path/to/txt_file", "t")) 329 330 The GSR file is a binary file, whereas text_file is a text file. 331 """ 332 d = {} 333 for k, v in kwargs.items(): 334 mode = "b" 335 if isinstance(v, (list, tuple)): v, mode = v 336 d[k] = GridFsFile(path=v, mode=mode) 337 338 self["files"].update(d) 339 return self 340 341 def push_exceptions(self, *exceptions): 342 for exc in exceptions: 343 newstr = str(exc) 344 if newstr not in self.exceptions: 345 self["exceptions"] += [newstr,] 346 347 @pmg_serialize 348 def as_dict(self): 349 return self.copy() 350 351 @classmethod 352 def from_dict(cls, d): 353 return cls({k: v for k, v in d.items() if k not in ("@module", "@class")}) 354 355 def json_dump(self, filename): 356 json_pretty_dump(self.as_dict(), filename) 357 358 @classmethod 359 def json_load(cls, filename): 360 return cls.from_dict(loadfn(filename)) 361 362 def validate_json_schema(self): 363 import validictory 364 d = self.as_dict() 365 try: 366 validictory.validate(d, self.JSON_SCHEMA) 367 return True 368 except ValueError as exc: 369 pprint(d) 370 print(exc) 371 return False 372 373 def update_collection(self, collection): 374 """ 375 Update a mongodb collection. 376 """ 377 node = self.node 378 flow = node if node.is_flow else node.flow 379 380 # Build the key used to store the entry in the document. 381 key = node.name 382 if node.is_task: 383 key = "w" + str(node.pos[0]) + "_t" + str(node.pos[1]) 384 elif node.is_work: 385 key = "w" + str(node.pos) 386 387 db = collection.database 388 389 # Save files with GridFs first in order to get the ID. 390 if self.gridfs_files: 391 import gridfs 392 fs = gridfs.GridFS(db) 393 for ext, gridfile in self.gridfs_files.items(): 394 logger.info("gridfs: about to put file:", str(gridfile)) 395 # Here we set gridfile.fs_id that will be stored in the mondodb document 396 try: 397 with open(gridfile.path, "r" + gridfile.mode) as f: 398 gridfile.fs_id = fs.put(f, filename=gridfile.path) 399 except IOError as exc: 400 logger.critical(str(exc)) 401 402 if flow.mongo_id is None: 403 # Flow does not have a mongo_id, allocate doc for the flow and save its id. 404 flow.mongo_id = collection.insert({}) 405 print("Creating flow.mongo_id", flow.mongo_id, type(flow.mongo_id)) 406 407 # Get the document from flow.mongo_id and update it. 408 doc = collection.find_one({"_id": flow.mongo_id}) 409 if key in doc: 410 raise ValueError("%s is already in doc!" % key) 411 doc[key] = self.as_dict() 412 413 collection.save(doc) 414 #collection.update({'_id':mongo_id}, {"$set": doc}, upsert=False) 415 416 417def check_spectator(node_method): 418 """ 419 Decorator for |Node| methods. Raise `SpectatorNodeError`. 420 """ 421 from functools import wraps 422 @wraps(node_method) 423 def wrapper(*args, **kwargs): 424 node = args[0] 425 if node.in_spectator_mode: 426 #raise node.SpectatorError("You should not call this method when the node in spectator_mode") 427 import warnings 428 warnings.warn("You should not call %s when the node in spectator_mode" % str(node_method)) 429 430 return node_method(*args, **kwargs) 431 432 return wrapper 433 434 435class NodeError(Exception): 436 """Base Exception raised by |Node| subclasses""" 437 438 439class SpectatorNodeError(NodeError): 440 """ 441 Exception raised by |Node| methods when the node is in spectator mode 442 and we are calling a method with side effects. 443 """ 444 445 446class Node(metaclass=abc.ABCMeta): 447 """ 448 Abstract base class defining the interface that must be 449 implemented by the nodes of the calculation. 450 451 Nodes are hashable and can be tested for equality 452 """ 453 Results = NodeResults 454 455 Error = NodeError 456 SpectatorError = SpectatorNodeError 457 458 # Possible status of the node. 459 S_INIT = Status.from_string("Initialized") 460 S_LOCKED = Status.from_string("Locked") 461 S_READY = Status.from_string("Ready") 462 S_SUB = Status.from_string("Submitted") 463 S_RUN = Status.from_string("Running") 464 S_DONE = Status.from_string("Done") 465 S_ABICRITICAL = Status.from_string("AbiCritical") 466 S_QCRITICAL = Status.from_string("QCritical") 467 S_UNCONVERGED = Status.from_string("Unconverged") 468 #S_CANCELLED = Status.from_string("Cancelled") 469 S_ERROR = Status.from_string("Error") 470 S_OK = Status.from_string("Completed") 471 472 ALL_STATUS = [ 473 S_INIT, 474 S_LOCKED, 475 S_READY, 476 S_SUB, 477 S_RUN, 478 S_DONE, 479 S_ABICRITICAL, 480 S_QCRITICAL, 481 S_UNCONVERGED, 482 #S_CANCELLED, 483 S_ERROR, 484 S_OK, 485 ] 486 487 # Color used to plot the network in networkx 488 color_rgb = np.array((105, 105, 105)) / 255 489 490 def __init__(self): 491 self._in_spectator_mode = False 492 # Node identifier. 493 self._node_id = get_newnode_id() 494 495 # List of dependencies 496 self._deps = [] 497 498 # List of files (products) needed by this node. 499 self._required_files = [] 500 501 # Used to push additional info during the execution. 502 self.history = NodeHistory(maxlen=80) 503 504 # Actions performed to fix abicritical events. 505 self._corrections = NodeCorrections() 506 507 # Set to true if the node has been finalized. 508 self._finalized = False 509 self._status = self.S_INIT 510 511 def __eq__(self, other): 512 if not isinstance(other, Node): return False 513 return self.node_id == other.node_id 514 515 def __ne__(self, other): 516 return not self.__eq__(other) 517 518 def __hash__(self): 519 return hash(self.node_id) 520 521 def __repr__(self): 522 try: 523 return "<%s, node_id=%s, workdir=%s>" % ( 524 self.__class__.__name__, self.node_id, self.relworkdir) 525 except AttributeError: 526 # this usually happens when workdir has not been initialized 527 return "<%s, node_id=%s, workdir=None>" % (self.__class__.__name__, self.node_id) 528 529 #def __setattr__(self, name, value): 530 # if self.in_spectator_mode: 531 # raise RuntimeError("You should not call __setattr__ in spectator_mode") 532 # return super().__setattr__(name,value) 533 534 @lazy_property 535 def color_hex(self): 536 """Node color as Hex Triplet https://en.wikipedia.org/wiki/Web_colors#Hex_triplet""" 537 def clamp(x): 538 return max(0, min(int(x), 255)) 539 540 r, g, b = np.trunc(self.color_rgb * 255) 541 return "#{0:02x}{1:02x}{2:02x}".format(clamp(r), clamp(g), clamp(b)) 542 543 def isinstance(self, class_or_string): 544 """ 545 Check whether the node is a instance of `class_or_string`. 546 Unlinke the standard isinstance builtin, the method accepts either a class or a string. 547 In the later case, the string is compared with self.__class__.__name__ (case insensitive). 548 """ 549 if class_or_string is None: 550 return False 551 import inspect 552 if inspect.isclass(class_or_string): 553 return isinstance(self, class_or_string) 554 else: 555 return self.__class__.__name__.lower() == class_or_string.lower() 556 557 @classmethod 558 def as_node(cls, obj): 559 """ 560 Convert obj into a Node instance. 561 562 Return: 563 obj if obj is a Node instance, 564 cast obj to :class:`FileNode` instance of obj is a string. 565 None if obj is None 566 """ 567 if isinstance(obj, cls): 568 return obj 569 elif is_string(obj): 570 # Assume filepath. 571 return FileNode(obj) 572 elif obj is None: 573 return obj 574 else: 575 raise TypeError("Don't know how to convert %s to Node instance." % obj) 576 577 @property 578 def name(self): 579 """ 580 The name of the node 581 (only used for facilitating its identification in the user interface). 582 """ 583 try: 584 return self._name 585 except AttributeError: 586 if self.is_task: 587 try: 588 return self.pos_str 589 except Exception: 590 return os.path.basename(self.workdir) 591 else: 592 return os.path.basename(self.workdir) 593 594 @property 595 def relworkdir(self): 596 """Return a relative version of the workdir""" 597 if getattr(self, "workdir", None) is None: 598 return None 599 try: 600 return os.path.relpath(self.workdir) 601 except OSError: 602 # current working directory may not be defined! 603 return self.workdir 604 605 def set_name(self, name): 606 """Set the name of the Node.""" 607 self._name = name 608 609 @property 610 def node_id(self): 611 """Node identifier.""" 612 return self._node_id 613 614 @check_spectator 615 def set_node_id(self, node_id): 616 """Set the node identifier. Use it carefully!""" 617 self._node_id = node_id 618 619 @property 620 def finalized(self): 621 """True if the `Node` has been finalized.""" 622 return self._finalized 623 624 @finalized.setter 625 def finalized(self, boolean): 626 self._finalized = boolean 627 self.history.info("Finalized set to %s" % self._finalized) 628 629 @property 630 def in_spectator_mode(self): 631 return self._in_spectator_mode 632 633 @in_spectator_mode.setter 634 def in_spectator_mode(self, mode): 635 self._in_spectator_mode = bool(mode) 636 #self.history.info("in_spectator_mode set to %s" % mode) 637 638 @property 639 def corrections(self): 640 """ 641 List of dictionaries with infornation on the actions performed to solve `AbiCritical` Events. 642 Each dictionary contains the `AbinitEvent` who triggered the correction and 643 a human-readable message with the description of the operation performed. 644 """ 645 return self._corrections 646 647 @property 648 def num_corrections(self): 649 return len(self.corrections) 650 651 def log_correction(self, event, action): 652 """ 653 This method should be called once we have fixed the problem associated to this event. 654 It adds a new entry in the correction history of the node. 655 656 Args: 657 event: :class:`AbinitEvent` that triggered the correction. 658 action (str): Human-readable string with info on the action perfomed to solve the problem. 659 """ 660 # TODO: Create CorrectionObject 661 action = str(action) 662 self.history.info(action) 663 664 self._corrections.append(dict( 665 event=event.as_dict(), 666 action=action, 667 )) 668 669 @property 670 def is_file(self): 671 """True if this node is a file""" 672 return isinstance(self, FileNode) 673 674 @property 675 def is_task(self): 676 """True if this node is a Task""" 677 from .tasks import Task 678 return isinstance(self, Task) 679 680 @property 681 def is_work(self): 682 """True if this node is a Work""" 683 from .works import Work 684 return isinstance(self, Work) 685 686 @property 687 def is_flow(self): 688 """True if this node is a Flow""" 689 from .flows import Flow 690 return isinstance(self, Flow) 691 692 @property 693 def deps(self): 694 """ 695 List of :class:`Dependency` objects defining the dependencies 696 of this `Node`. Empty list if this |Node| does not have dependencies. 697 """ 698 return self._deps 699 700 @check_spectator 701 def add_deps(self, deps): 702 """ 703 Add a list of dependencies to the |Node|. 704 705 Args: 706 deps: List of :class:`Dependency` objects specifying the dependencies of the node. 707 or dictionary mapping nodes to file extensions e.g. {task: "DEN"} 708 """ 709 if isinstance(deps, collections.abc.Mapping): 710 # Convert dictionary into list of dependencies. 711 deps = [Dependency(node, exts) for node, exts in deps.items()] 712 713 # We want a list 714 if not isinstance(deps, (list, tuple)): 715 deps = [deps] 716 717 assert all(isinstance(d, Dependency) for d in deps) 718 719 # Add the dependencies to the node and merge possibly duplicated keys. 720 self._deps.extend(deps) 721 self.merge_deps() 722 723 if self.is_work: 724 # The task in the work should inherit the same dependency. 725 for task in self: 726 task.add_deps(deps) 727 task.merge_deps() 728 729 # If we have a FileNode as dependency, add self to its children 730 # Node.get_parents will use this list if node.is_isfile. 731 for dep in (d for d in deps if d.node.is_file): 732 dep.node.add_filechild(self) 733 734 def merge_deps(self): 735 """ 736 Group all extensions associated to the same node in a single list. 737 Useful for cases in which we may end up with the same node appearing more than once 738 in self.deps. See e.g. ``add_deps``. 739 """ 740 from collections import defaultdict 741 node2exts = defaultdict(list) 742 for dep in self.deps: 743 node2exts[dep.node].extend(dep.exts) 744 self._deps = [Dependency(node, exts) for node, exts in node2exts.items()] 745 746 @check_spectator 747 def remove_deps(self, deps): 748 """ 749 Remove a list of dependencies from the |Node|. 750 751 Args: 752 deps: List of :class:`Dependency` objects specifying the dependencies of the node. 753 """ 754 if not isinstance(deps, (list, tuple)): 755 deps = [deps] 756 757 assert all(isinstance(d, Dependency) for d in deps) 758 759 self._deps = [d for d in self._deps if d not in deps] 760 761 if self.is_work: 762 # remove the same list of dependencies from the task in the work 763 for task in self: 764 task.remove_deps(deps) 765 766 @property 767 def deps_status(self): 768 """Returns a list with the status of the dependencies.""" 769 if not self.deps: 770 return [self.S_OK] 771 772 return [d.status for d in self.deps] 773 774 def depends_on(self, other): 775 """True if this node depends on the other node.""" 776 return other in [d.node for d in self.deps] 777 778 def find_parent_with_ext(self, ext): 779 """ 780 Return the parent (usually a |Task|) that produces the file with extension `ext`. 781 Raises ValueError if multiple parents are found. 782 Return None if no parent is found. 783 """ 784 parent, count = None, 0 785 for dep in self.deps: 786 if ext in dep.exts: 787 parent = dep.node 788 count += 1 789 790 if count > 1: 791 raise ValueError("Cannot have multiple parents producing the same file extension!\n%s" % self.str_deps()) 792 793 return parent 794 795 def get_parents(self): 796 """Return the list of nodes in the |Flow| required by this |Node|""" 797 return [d.node for d in self.deps] 798 799 def get_children(self): 800 """ 801 Return the list of nodes in the |Flow| that depends on this |Node| 802 803 .. note:: 804 805 This routine assumes the entire flow has been allocated. 806 """ 807 # Specialized branch for FileNode. 808 if self.is_file: 809 return self.filechildren 810 811 # Inspect the entire flow to get children. 812 children = [] 813 for work in self.flow: 814 if work.depends_on(self): children.append(work) 815 for task in work: 816 if task.depends_on(self): children.append(task) 817 return children 818 819 def str_deps(self): 820 """Return the string representation of the dependencies of the node.""" 821 lines = [] 822 app = lines.append 823 824 app("Dependencies of node %s:" % str(self)) 825 for i, dep in enumerate(self.deps): 826 app("%d) %s, status=%s" % (i, dep.info, str(dep.status))) 827 828 return "\n".join(lines) 829 830 def get_vars_dataframe(self, *varnames): 831 """ 832 Return pandas DataFrame with the value of the variables specified in `varnames`. 833 Can be used for task/works/flow. It's recursive! 834 835 .. example: 836 837 flow.get_vars_dataframe("ecut", "ngkpt") 838 work.get_vars_dataframe("acell", "usepawu") 839 """ 840 import pandas as pd 841 if self.is_task: 842 df = pd.DataFrame([{v: self.input.get(v, None) for v in varnames}], index=[self.name], columns=varnames) 843 df["class"] = self.__class__.__name__ 844 return df 845 846 elif self.is_work: 847 frames = [task.get_vars_dataframe(*varnames) for task in self] 848 return pd.concat(frames) 849 850 elif self.is_flow: 851 frames = [work.get_vars_dataframe(*varnames) for work in self] 852 return pd.concat(frames) 853 854 else: 855 #print("Ignoring node of type: `%s`" % type(self)) 856 return pd.DataFrame(index=[self.name]) 857 858 def get_graphviz_dirtree(self, engine="automatic", **kwargs): 859 """ 860 Generate directory graph in the DOT language. The graph show the files and directories 861 in the node workdir. 862 863 Returns: graphviz.Digraph <https://graphviz.readthedocs.io/en/stable/api.html#digraph> 864 """ 865 if engine == "automatic": 866 engine = "fdp" 867 868 return Dirviz(self.workdir).get_cluster_graph(engine=engine, **kwargs) 869 870 def set_gc(self, gc): 871 """ 872 Set the garbage collector. 873 """ 874 assert isinstance(gc, GarbageCollector) 875 self._gc = gc 876 877 @property 878 def gc(self): 879 """ 880 Garbage collector. None if garbage collection is deactivated. 881 Use flow.set_garbage_collector to initialize the object. 882 """ 883 try: 884 return self._gc 885 except AttributeError: 886 #if not self.is_flow and self.flow.gc: return self.flow.gc 887 return None 888 889 @property 890 def event_handlers(self): 891 """ 892 The list of handlers registered for this node. 893 If the node is not a `Flow` and does not have its own list of 894 `handlers` the handlers registered at the level of the flow are returned. 895 896 This trick allows one to registered different handlers at the level of the Task 897 for testing purposes. By default, we have a common list of handlers for all the nodes in the flow. 898 This choice facilitates the automatic installation of the handlers when we use callbacks to generate 899 new Works and Tasks! 900 """ 901 if self.is_flow: 902 return self._event_handlers 903 904 try: 905 return self._event_handlers 906 except AttributeError: 907 return self.flow._event_handlers 908 909 @check_spectator 910 def install_event_handlers(self, categories=None, handlers=None): 911 """ 912 Install the `EventHandlers for this `Node`. If no argument is provided 913 the default list of handlers is installed. 914 915 Args: 916 categories: List of categories to install e.g. base + can_change_physics 917 handlers: explicit list of :class:`EventHandler` instances. 918 This is the most flexible way to install handlers. 919 920 .. note:: 921 922 categories and handlers are mutually exclusive. 923 """ 924 if categories is not None and handlers is not None: 925 raise ValueError("categories and handlers are mutually exclusive!") 926 927 from .events import get_event_handler_classes 928 if categories: 929 raise NotImplementedError() 930 handlers = [cls() for cls in get_event_handler_classes(categories=categories)] 931 else: 932 handlers = handlers or [cls() for cls in get_event_handler_classes()] 933 934 self._event_handlers = handlers 935 936 def show_event_handlers(self, stream=sys.stdout, verbose=0): 937 """Print to `stream` the event handlers installed for this flow.""" 938 lines = ["List of event handlers installed:"] 939 for handler in self.event_handlers: 940 if verbose: 941 lines.extend(handler.__class__.cls2str().split("\n")) 942 else: 943 lines.extend(str(handler).split("\n")) 944 945 stream.write("\n".join(lines)) 946 stream.write("\n") 947 948 def send_signal(self, signal): 949 """ 950 Send signal from this node to all connected receivers unless the node is in spectator mode. 951 952 signal -- (hashable) signal value, see `dispatcher` connect for details 953 954 Return a list of tuple pairs [(receiver, response), ... ] 955 or None if the node is in spectator mode. 956 957 if any receiver raises an error, the error propagates back 958 through send, terminating the dispatch loop, so it is quite 959 possible to not have all receivers called if a raises an error. 960 """ 961 if self.in_spectator_mode: return None 962 self.history.debug("Node %s broadcasts signal %s" % (self, signal)) 963 dispatcher.send(signal=signal, sender=self) 964 965 ########################## 966 ### Abstract protocol #### 967 ########################## 968 969 @property 970 @abc.abstractmethod 971 def status(self): 972 """The status of the `Node`.""" 973 974 @abc.abstractmethod 975 def check_status(self): 976 """Check the status of the `Node`.""" 977 978 979class FileNode(Node): 980 """ 981 A Node that consists of a file. May be not yet existing 982 983 Mainly used to connect |Task| objects to external files produced in previous runs. 984 """ 985 color_rgb = np.array((102, 51, 255)) / 255 986 987 def __init__(self, filename): 988 super().__init__() 989 self.filepath = os.path.abspath(filename) 990 991 # Directories with input|output|temporary data. 992 self.workdir = os.path.dirname(self.filepath) 993 994 self.indir = Directory(self.workdir) 995 self.outdir = Directory(self.workdir) 996 self.tmpdir = Directory(self.workdir) 997 998 self._filechildren = [] 999 1000 def __repr__(self): 1001 try: 1002 return "<%s, node_id=%s, rpath=%s>" % ( 1003 self.__class__.__name__, self.node_id, os.path.relpath(self.filepath)) 1004 except AttributeError: 1005 # this usually happens when workdir has not been initialized 1006 return "<%s, node_id=%s, path=%s>" % (self.__class__.__name__, self.node_id, self.filepath) 1007 1008 @lazy_property 1009 def basename(self): 1010 """Basename of the file.""" 1011 return os.path.basename(self.filepath) 1012 1013 @property 1014 def products(self): 1015 return [Product.from_file(self.filepath)] 1016 1017 def opath_from_ext(self, ext): 1018 return self.filepath 1019 1020 @property 1021 def status(self): 1022 return self.S_OK if os.path.exists(self.filepath) else self.S_ERROR 1023 1024 def check_status(self): 1025 return self.status 1026 1027 def get_results(self, **kwargs): 1028 results = super().get_results(**kwargs) 1029 #results.register_gridfs_files(filepath=self.filepath) 1030 return results 1031 1032 def add_filechild(self, node): 1033 """Add a node (usually Task) to the children of this FileNode.""" 1034 self._filechildren.append(node) 1035 1036 @property 1037 def filechildren(self): 1038 """List with the children (nodes) of this FileNode.""" 1039 return self._filechildren 1040 1041 # This part provides IO capabilities to FileNode with API similar to the one implemented in Task. 1042 # We may need it at runtime to extract information from netcdf files e.g. 1043 # a NscfTask will change the FFT grid to match the one used in the GsTask. 1044 1045 def abiopen(self): 1046 from abipy import abilab 1047 return abilab.abiopen(self.filepath) 1048 1049 def open_gsr(self): 1050 return self._abiopen_abiext("_GSR.nc") 1051 1052 def _abiopen_abiext(self, abiext): 1053 import glob 1054 from abipy import abilab 1055 if not self.filepath.endswith(abiext): 1056 msg = """\n 1057File type does not match the abinit file extension. 1058Caller asked for abiext: `%s` whereas filepath: `%s`. 1059Continuing anyway assuming that the netcdf file provides the API/dims/vars neeeded by the caller. 1060""" % (abiext, self.filepath) 1061 self.history.warning(msg) 1062 1063 #try to find file in the same path 1064 filepath = os.path.dirname(self.filepath) 1065 glob_result = glob.glob(os.path.join(filepath, "*%s" % abiext)) 1066 if len(glob_result): return abilab.abiopen(glob_result[0]) 1067 return self.abiopen() 1068 1069 1070class HistoryRecord(object): 1071 """ 1072 A `HistoryRecord` instance represents an entry in the :class:`NodeHistory`. 1073 1074 `HistoryRecord` instances are created every time something is logged. 1075 They contain all the information pertinent to the event being logged. 1076 The main information passed in is in msg and args, which are combined 1077 using str(msg) % args to create the message field of the record. 1078 The record also includes information such as when the record was created, 1079 the source line where the logging call was made 1080 1081 .. attribute:: levelno 1082 1083 Numeric logging level for the message (DEBUG, INFO, WARNING, ERROR, CRITICAL) 1084 1085 .. attribute:: levelname 1086 1087 Text logging level for the message ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL") 1088 1089 .. attribute:: pathname 1090 1091 Full pathname of the source file where the logging call was issued (if available) 1092 1093 .. attribute:: filename 1094 1095 Filename portion of pathname 1096 1097 .. attribute:: module 1098 1099 Module (name portion of filename) 1100 1101 .. attribute:: lineno 1102 1103 Source line number where the logging call was issued (if available) 1104 1105 .. attribute:: func_name 1106 1107 Function name 1108 1109 .. attribute:: created 1110 1111 Time when the HistoryRecord was created (time.time() return value) 1112 1113 .. attribute:: asctime 1114 1115 Textual time when the HistoryRecord was created 1116 1117 .. attribute:: message 1118 The result of record.getMessage(), computed just as the record is emitted 1119 """ 1120 def __init__(self, level, pathname, lineno, msg, args, exc_info, func=None): 1121 """ 1122 Initialize a logging record with interesting information. 1123 """ 1124 # 1125 # The following statement allows passing of a dictionary as a sole 1126 # argument, so that you can do something like 1127 # logging.debug("a %(a)d b %(b)s", {'a':1, 'b':2}) 1128 # Suggested by Stefan Behnel. 1129 # Note that without the test for args[0], we get a problem because 1130 # during formatting, we test to see if the arg is present using 1131 # 'if self.args:'. If the event being logged is e.g. 'Value is %d' 1132 # and if the passed arg fails 'if self.args:' then no formatting 1133 # is done. For example, logger.warn('Value is %d', 0) would log 1134 # 'Value is %d' instead of 'Value is 0'. 1135 # For the use case of passing a dictionary, this should not be a problem. 1136 if args and len(args) == 1 and isinstance(args[0], dict) and args[0]: 1137 args = args[0] 1138 self.args = args 1139 self.levelno = level 1140 self.pathname = pathname 1141 self.msg = msg 1142 1143 self.levelname = "FOOBAR" #getLevelName(level) 1144 1145 try: 1146 self.filename = os.path.basename(pathname) 1147 self.module = os.path.splitext(self.filename)[0] 1148 except (TypeError, ValueError, AttributeError): 1149 self.filename = pathname 1150 self.module = "Unknown module" 1151 1152 self.exc_info = exc_info 1153 self.exc_text = None # used to cache the traceback text 1154 self.lineno = lineno 1155 self.func_name = func 1156 self.created = time.time() 1157 self.asctime = time.asctime() 1158 # Remove milliseconds 1159 i = self.asctime.find(".") 1160 if i != -1: self.asctime = self.asctime[:i] 1161 1162 def __repr__(self): 1163 return '<%s, %s, %s, %s,\n"%s">' % (self.__class__.__name__, self.levelno, self.pathname, self.lineno, self.msg) 1164 1165 def __str__(self): 1166 return self.get_message(metadata=False) 1167 1168 def get_message(self, metadata=False, asctime=True): 1169 """ 1170 Return the message after merging any user-supplied arguments with the message. 1171 1172 Args: 1173 metadata: True if function and module name should be added. 1174 asctime: True if time string should be added. 1175 """ 1176 msg = self.msg if is_string(self.msg) else str(self.msg) 1177 if self.args: 1178 try: 1179 msg = msg % self.args 1180 except Exception: 1181 msg += str(self.args) 1182 1183 if asctime: msg = "[" + self.asctime + "] " + msg 1184 1185 # Add metadata 1186 if metadata: 1187 msg += "\nCalled by %s at %s:%s\n" % (self.func_name, self.pathname, self.lineno) 1188 1189 return msg 1190 1191 @pmg_serialize 1192 def as_dict(self): 1193 return {'level': self.levelno, 'pathname': self.pathname, 'lineno': self.lineno, 'msg': self.msg, 1194 'args': self.args, 'exc_info': self.exc_info, 'func': self.func_name} 1195 1196 @classmethod 1197 def from_dict(cls, d): 1198 return cls(level=d['level'], pathname=d['pathname'], lineno=int(d['lineno']), msg=d['msg'], args=d['args'], 1199 exc_info=d['exc_info'], func=d['func']) 1200 1201 1202class NodeHistory(collections.deque): 1203 """Logger-like object""" 1204 1205 def __str__(self): 1206 return self.to_string() 1207 1208 def to_string(self, metadata=False): 1209 """Returns a string with the history. Set metadata to True to have info on function and module.""" 1210 return "\n".join(rec.get_message(metadata=metadata) for rec in self) 1211 1212 def info(self, msg, *args, **kwargs): 1213 """Log 'msg % args' with the info severity level""" 1214 self._log("INFO", msg, args, kwargs) 1215 1216 def warning(self, msg, *args, **kwargs): 1217 """Log 'msg % args' with the warning severity level""" 1218 self._log("WARNING", msg, args, kwargs) 1219 1220 def critical(self, msg, *args, **kwargs): 1221 """Log 'msg % args' with the critical severity level""" 1222 self._log("CRITICAL", msg, args, kwargs) 1223 1224 def debug(self, msg, *args, **kwargs): 1225 """Log 'msg % args' with the critical severity level""" 1226 self._log("DEBUG", msg, args, kwargs) 1227 1228 def _log(self, level, msg, args, exc_info=None, extra=None): 1229 """Low-level logging routine which creates a :class:`HistoryRecord`.""" 1230 if exc_info and not isinstance(exc_info, tuple): 1231 exc_info = sys.exc_info() 1232 1233 self.append(HistoryRecord(level, "unknown filename", 0, msg, args, exc_info, func="unknown func")) 1234 1235 1236class NodeCorrections(list): 1237 """Iterable storing the correctios performed by the :class:`EventHandler`""" 1238 #TODO 1239 # Correction should have a human-readable message 1240 # and a list of operatins in JSON format (Modder?) so that 1241 # we can read them and re-apply the corrections to another task if needed. 1242 1243 #def count_event_class(self, event_class): 1244 # """ 1245 # Return the number of times the event class has been already fixed. 1246 # """ 1247 # #return len([c for c in self if c["event"]["@class"] == str(event_class)]) 1248 1249 #def _find(self, event_class) 1250 1251 1252class GarbageCollector(object): 1253 """This object stores information on the """ 1254 def __init__(self, exts, policy): 1255 self.exts, self.policy = set(exts), policy 1256 1257 1258# The code below initializes a counter from a file when the module is imported 1259# and save the counter's updated value automatically when the program terminates 1260# without relying on the application making an explicit call into this module at termination. 1261 1262_COUNTER = None 1263_COUNTER_FILE = os.path.join(os.path.expanduser("~"), ".abinit", "abipy", "nodecounter") 1264 1265 1266def init_counter(): 1267 global _COUNTER 1268 1269 # Make dir and file if not present. 1270 if not os.path.exists(os.path.dirname(_COUNTER_FILE)): 1271 os.makedirs(os.path.dirname(_COUNTER_FILE)) 1272 1273 if not os.path.exists(_COUNTER_FILE): 1274 with open(_COUNTER_FILE, "wt") as fh: 1275 fh.write("%d\n" % -1) 1276 1277 if _COUNTER is None: 1278 with open(_COUNTER_FILE, "r") as fh: 1279 s = fh.read().strip() 1280 if not s: s = "-1" 1281 _COUNTER = int(s) 1282 1283 1284def get_newnode_id(): 1285 """ 1286 Returns a new node identifier used for |Task|, |Work| and |Flow| objects. 1287 1288 .. warning: 1289 1290 The id is unique inside the same python process so be careful when 1291 Works and Tasks are constructed at run-time or when threads are used. 1292 """ 1293 init_counter() 1294 1295 global _COUNTER 1296 _COUNTER += 1 1297 return _COUNTER 1298 1299 1300def save_lastnode_id(): 1301 """Save the id of the last node created.""" 1302 init_counter() 1303 1304 with FileLock(_COUNTER_FILE): 1305 with AtomicFile(_COUNTER_FILE, mode="w") as fh: 1306 fh.write("%d\n" % _COUNTER) 1307 1308 1309# Register function atexit 1310import atexit 1311atexit.register(save_lastnode_id) 1312