1# coding: utf-8 2"""This module provides functions and classes related to Task objects.""" 3import os 4import time 5import datetime 6import shutil 7import collections 8import abc 9import copy 10import ruamel.yaml as yaml 11from io import StringIO 12import numpy as np 13 14from pprint import pprint 15from itertools import product 16from monty.string import is_string, list_strings 17from monty.termcolor import colored, cprint 18from monty.collections import AttrDict 19from monty.functools import lazy_property, return_none_if_raise 20from monty.json import MSONable 21from monty.fnmatch import WildCard 22from pymatgen.core.units import Memory 23from pymatgen.util.serialization import json_pretty_dump, pmg_serialize 24from .utils import File, Directory, irdvars_for_ext, abi_splitext, FilepathFixer, Condition, SparseHistogram 25from .qadapters import make_qadapter, QueueAdapter, QueueAdapterError 26from . import qutils as qu 27from .db import DBConnector 28from .nodes import Status, Node, NodeError, NodeResults, FileNode #, check_spectator 29from . import abiinspect 30from . import events 31from .abitimer import AbinitTimerParser 32 33 34__author__ = "Matteo Giantomassi" 35__copyright__ = "Copyright 2013, The Materials Project" 36__version__ = "0.1" 37__maintainer__ = "Matteo Giantomassi" 38 39__all__ = [ 40 "TaskManager", 41 "AbinitBuild", 42 "ParalHintsParser", 43 "ParalHints", 44 "AbinitTask", 45 "ScfTask", 46 "NscfTask", 47 "RelaxTask", 48 "DdkTask", 49 "EffMassTask", 50 "PhononTask", 51 "ElasticTask", 52 "SigmaTask", 53 "EphTask", 54 "OpticTask", 55 "AnaddbTask", 56 "set_user_config_taskmanager", 57] 58 59import logging 60logger = logging.getLogger(__name__) 61 62# Tools and helper functions. 63 64 65def straceback(): 66 """Returns a string with the traceback.""" 67 import traceback 68 return traceback.format_exc() 69 70 71def nmltostring(nml): 72 """Convert a dictionary representing a Fortran namelist into a string.""" 73 if not isinstance(nml, dict): 74 raise ValueError("nml should be a dict !") 75 76 curstr = "" 77 for key, group in nml.items(): 78 namelist = ["&" + key] 79 for k, v in group.items(): 80 if isinstance(v, list) or isinstance(v, tuple): 81 namelist.append(k + " = " + ",".join(map(str, v)) + ",") 82 elif is_string(v): 83 namelist.append(k + " = '" + str(v) + "',") 84 else: 85 namelist.append(k + " = " + str(v) + ",") 86 namelist.append("/") 87 curstr = curstr + "\n".join(namelist) + "\n" 88 89 return curstr 90 91 92class TaskResults(NodeResults): 93 94 JSON_SCHEMA = NodeResults.JSON_SCHEMA.copy() 95 JSON_SCHEMA["properties"] = { 96 "executable": {"type": "string", "required": True}, 97 } 98 99 @classmethod 100 def from_node(cls, task): 101 """Initialize an instance from an |AbinitTask| instance.""" 102 new = super().from_node(task) 103 104 new.update( 105 executable=task.executable, 106 #executable_version: 107 #task_events= 108 pseudos=[p.as_dict() for p in task.input.pseudos], 109 #input=task.input 110 ) 111 112 new.register_gridfs_files( 113 run_abi=(task.input_file.path, "t"), 114 run_abo=(task.output_file.path, "t"), 115 ) 116 117 return new 118 119 120class ParalConf(AttrDict): 121 """ 122 This object store the parameters associated to one 123 of the possible parallel configurations reported by ABINIT. 124 Essentially it is a dictionary whose values can also be accessed 125 as attributes. It also provides default values for selected keys 126 that might not be present in the ABINIT dictionary. 127 128 Example: 129 130 --- !Autoparal 131 info: 132 version: 1 133 autoparal: 1 134 max_ncpus: 108 135 configurations: 136 - tot_ncpus: 2 # Total number of CPUs 137 mpi_ncpus: 2 # Number of MPI processes. 138 omp_ncpus: 1 # Number of OMP threads (1 if not present) 139 mem_per_cpu: 10 # Estimated memory requirement per MPI processor in Megabytes. 140 efficiency: 0.4 # 1.0 corresponds to an "expected" optimal efficiency (strong scaling). 141 vars: { # Dictionary with the variables that should be added to the input. 142 varname1: varvalue1 143 varname2: varvalue2 144 } 145 - 146 ... 147 148 For paral_kgb we have: 149 nproc npkpt npspinor npband npfft bandpp weight 150 108 1 1 12 9 2 0.25 151 108 1 1 108 1 2 27.00 152 96 1 1 24 4 1 1.50 153 84 1 1 12 7 2 0.25 154 """ 155 _DEFAULTS = { 156 "omp_ncpus": 1, 157 "mem_per_cpu": 0.0, 158 "vars": {} 159 } 160 161 def __init__(self, *args, **kwargs): 162 super().__init__(*args, **kwargs) 163 164 # Add default values if not already in self. 165 for k, v in self._DEFAULTS.items(): 166 if k not in self: 167 self[k] = v 168 169 def __str__(self): 170 stream = StringIO() 171 pprint(self, stream=stream) 172 return stream.getvalue() 173 174 @property 175 def num_cores(self): 176 return self.mpi_procs * self.omp_threads 177 178 @property 179 def mem_per_proc(self): 180 return self.mem_per_cpu 181 182 @property 183 def mpi_procs(self): 184 return self.mpi_ncpus 185 186 @property 187 def omp_threads(self): 188 return self.omp_ncpus 189 190 @property 191 def speedup(self): 192 """Estimated speedup reported by ABINIT.""" 193 return self.efficiency * self.num_cores 194 195 @property 196 def tot_mem(self): 197 """Estimated total memory in Mbs (computed from mem_per_proc)""" 198 return self.mem_per_proc * self.mpi_procs 199 200 201class ParalHintsError(Exception): 202 """Base error class for `ParalHints`.""" 203 204 205class ParalHintsParser(object): 206 207 Error = ParalHintsError 208 209 def __init__(self): 210 # Used to push error strings. 211 self._errors = collections.deque(maxlen=100) 212 213 def add_error(self, errmsg): 214 self._errors.append(errmsg) 215 216 def parse(self, filename): 217 """ 218 Read the `AutoParal` section (YAML format) from filename. 219 Assumes the file contains only one section. 220 """ 221 with abiinspect.YamlTokenizer(filename) as r: 222 doc = r.next_doc_with_tag("!Autoparal") 223 try: 224 d = yaml.safe_load(doc.text_notag) 225 return ParalHints(info=d["info"], confs=d["configurations"]) 226 except Exception: 227 import traceback 228 sexc = traceback.format_exc() 229 err_msg = "Wrong YAML doc:\n%s\n\nException:\n%s" % (doc.text, sexc) 230 self.add_error(err_msg) 231 raise self.Error(err_msg) 232 233 234class ParalHints(collections.abc.Iterable): 235 """ 236 Iterable with the hints for the parallel execution reported by ABINIT. 237 """ 238 Error = ParalHintsError 239 240 def __init__(self, info, confs): 241 self.info = info 242 self._confs = [ParalConf(**d) for d in confs] 243 244 @classmethod 245 def from_mpi_omp_lists(cls, mpi_procs, omp_threads): 246 """ 247 Build a list of Parallel configurations from two lists 248 containing the number of MPI processes and the number of OpenMP threads 249 i.e. product(mpi_procs, omp_threads). 250 The configuration have parallel efficiency set to 1.0 and no input variables. 251 Mainly used for preparing benchmarks. 252 """ 253 info = {} 254 confs = [ParalConf(mpi_ncpus=p, omp_ncpus=p, efficiency=1.0) 255 for p, t in product(mpi_procs, omp_threads)] 256 257 return cls(info, confs) 258 259 def __getitem__(self, key): 260 return self._confs[key] 261 262 def __iter__(self): 263 return self._confs.__iter__() 264 265 def __len__(self): 266 return self._confs.__len__() 267 268 def __repr__(self): 269 return "\n".join(str(conf) for conf in self) 270 271 def __str__(self): 272 return repr(self) 273 274 @lazy_property 275 def max_cores(self): 276 """Maximum number of cores.""" 277 return max(c.mpi_procs * c.omp_threads for c in self) 278 279 @lazy_property 280 def max_mem_per_proc(self): 281 """Maximum memory per MPI process.""" 282 return max(c.mem_per_proc for c in self) 283 284 @lazy_property 285 def max_speedup(self): 286 """Maximum speedup.""" 287 return max(c.speedup for c in self) 288 289 @lazy_property 290 def max_efficiency(self): 291 """Maximum parallel efficiency.""" 292 return max(c.efficiency for c in self) 293 294 @pmg_serialize 295 def as_dict(self, **kwargs): 296 return {"info": self.info, "confs": self._confs} 297 298 @classmethod 299 def from_dict(cls, d): 300 return cls(info=d["info"], confs=d["confs"]) 301 302 def copy(self): 303 """Shallow copy of self.""" 304 return copy.copy(self) 305 306 def select_with_condition(self, condition, key=None): 307 """ 308 Remove all the configurations that do not satisfy the given condition. 309 310 Args: 311 condition: dict or :class:`Condition` object with operators expressed with a Mongodb-like syntax 312 key: Selects the sub-dictionary on which condition is applied, e.g. key="vars" 313 if we have to filter the configurations depending on the values in vars 314 """ 315 condition = Condition.as_condition(condition) 316 new_confs = [] 317 318 for conf in self: 319 # Select the object on which condition is applied 320 obj = conf if key is None else AttrDict(conf[key]) 321 add_it = condition(obj=obj) 322 #if key is "vars": print("conf", conf, "added:", add_it) 323 if add_it: new_confs.append(conf) 324 325 self._confs = new_confs 326 327 def sort_by_efficiency(self, reverse=True): 328 """Sort the configurations in place. items with highest efficiency come first""" 329 self._confs.sort(key=lambda c: c.efficiency, reverse=reverse) 330 return self 331 332 def sort_by_speedup(self, reverse=True): 333 """Sort the configurations in place. items with highest speedup come first""" 334 self._confs.sort(key=lambda c: c.speedup, reverse=reverse) 335 return self 336 337 def sort_by_mem_per_proc(self, reverse=False): 338 """Sort the configurations in place. items with lowest memory per proc come first.""" 339 # Avoid sorting if mem_per_cpu is not available. 340 if any(c.mem_per_proc > 0.0 for c in self): 341 self._confs.sort(key=lambda c: c.mem_per_proc, reverse=reverse) 342 return self 343 344 def multidimensional_optimization(self, priorities=("speedup", "efficiency")): 345 # Mapping property --> options passed to sparse_histogram 346 opts = dict(speedup=dict(step=1.0), efficiency=dict(step=0.1), mem_per_proc=dict(memory=1024)) 347 #opts = dict(zip(priorities, bin_widths)) 348 349 opt_confs = self._confs 350 for priority in priorities: 351 histogram = SparseHistogram(opt_confs, key=lambda c: getattr(c, priority), **opts[priority]) 352 pos = 0 if priority == "mem_per_proc" else -1 353 opt_confs = histogram.values[pos] 354 355 #histogram.plot(show=True, savefig="hello.pdf") 356 return self.__class__(info=self.info, confs=opt_confs) 357 358 #def histogram_efficiency(self, step=0.1): 359 # """Returns a :class:`SparseHistogram` with configuration grouped by parallel efficiency.""" 360 # return SparseHistogram(self._confs, key=lambda c: c.efficiency, step=step) 361 362 #def histogram_speedup(self, step=1.0): 363 # """Returns a :class:`SparseHistogram` with configuration grouped by parallel speedup.""" 364 # return SparseHistogram(self._confs, key=lambda c: c.speedup, step=step) 365 366 #def histogram_memory(self, step=1024): 367 # """Returns a :class:`SparseHistogram` with configuration grouped by memory.""" 368 # return SparseHistogram(self._confs, key=lambda c: c.speedup, step=step) 369 370 #def filter(self, qadapter): 371 # """Return a new list of configurations that can be executed on the `QueueAdapter` qadapter.""" 372 # new_confs = [pconf for pconf in self if qadapter.can_run_pconf(pconf)] 373 # return self.__class__(info=self.info, confs=new_confs) 374 375 def get_ordered_with_policy(self, policy, max_ncpus): 376 """ 377 Sort and return a new list of configurations ordered according to the :class:`TaskPolicy` policy. 378 """ 379 # Build new list since we are gonna change the object in place. 380 hints = self.__class__(self.info, confs=[c for c in self if c.num_cores <= max_ncpus]) 381 382 # First select the configurations satisfying the condition specified by the user (if any) 383 bkp_hints = hints.copy() 384 if policy.condition: 385 logger.info("Applying condition %s" % str(policy.condition)) 386 hints.select_with_condition(policy.condition) 387 388 # Undo change if no configuration fullfills the requirements. 389 if not hints: 390 hints = bkp_hints 391 logger.warning("Empty list of configurations after policy.condition") 392 393 # Now filter the configurations depending on the values in vars 394 bkp_hints = hints.copy() 395 if policy.vars_condition: 396 logger.info("Applying vars_condition %s" % str(policy.vars_condition)) 397 hints.select_with_condition(policy.vars_condition, key="vars") 398 399 # Undo change if no configuration fullfills the requirements. 400 if not hints: 401 hints = bkp_hints 402 logger.warning("Empty list of configurations after policy.vars_condition") 403 404 if len(policy.autoparal_priorities) == 1: 405 # Example: hints.sort_by_speedup() 406 if policy.autoparal_priorities[0] in ['efficiency', 'speedup', 'mem_per_proc']: 407 getattr(hints, "sort_by_" + policy.autoparal_priorities[0])() 408 elif isinstance(policy.autoparal_priorities[0], collections.Mapping): 409 if policy.autoparal_priorities[0]['meta_priority'] == 'highest_speedup_minimum_efficiency_cutoff': 410 min_efficiency = policy.autoparal_priorities[0].get('minimum_efficiency', 1.0) 411 hints.select_with_condition({'efficiency': {'$gte': min_efficiency}}) 412 hints.sort_by_speedup() 413 else: 414 hints = hints.multidimensional_optimization(priorities=policy.autoparal_priorities) 415 if len(hints) == 0: raise ValueError("len(hints) == 0") 416 417 #TODO: make sure that num_cores == 1 is never selected when we have more than one configuration 418 #if len(hints) > 1: 419 # hints.select_with_condition(dict(num_cores={"$eq": 1))) 420 421 # Return final (orderded ) list of configurations (best first). 422 return hints 423 424 425class TaskPolicy(object): 426 """ 427 This object stores the parameters used by the |TaskManager| to 428 create the submission script and/or to modify the ABINIT variables 429 governing the parallel execution. A `TaskPolicy` object contains 430 a set of variables that specify the launcher, as well as the options 431 and the conditions used to select the optimal configuration for the parallel run 432 """ 433 @classmethod 434 def as_policy(cls, obj): 435 """ 436 Converts an object obj into a `:class:`TaskPolicy. Accepts: 437 438 * None 439 * TaskPolicy 440 * dict-like object 441 """ 442 if obj is None: 443 # Use default policy. 444 return TaskPolicy() 445 else: 446 if isinstance(obj, cls): 447 return obj 448 elif isinstance(obj, collections.abc.Mapping): 449 return cls(**obj) 450 else: 451 raise TypeError("Don't know how to convert type %s to %s" % (type(obj), cls)) 452 453 @classmethod 454 def autodoc(cls): 455 return """ 456 autoparal: # (integer). 0 to disable the autoparal feature (DEFAULT: 1 i.e. autoparal is on) 457 condition: # condition used to filter the autoparal configurations (Mongodb-like syntax). 458 # DEFAULT: empty i.e. ignored. 459 vars_condition: # Condition used to filter the list of ABINIT variables reported by autoparal 460 # (Mongodb-like syntax). DEFAULT: empty i.e. ignored. 461 frozen_timeout: # A job is considered frozen and its status is set to ERROR if no change to 462 # the output file has been done for `frozen_timeout` seconds. Accepts int with seconds or 463 # string in slurm form i.e. days-hours:minutes:seconds. DEFAULT: 1 hour. 464 precedence: # Under development. 465 autoparal_priorities: # Under development. 466""" 467 468 def __init__(self, **kwargs): 469 """ 470 See autodoc 471 """ 472 self.autoparal = kwargs.pop("autoparal", 1) 473 self.condition = Condition(kwargs.pop("condition", {})) 474 self.vars_condition = Condition(kwargs.pop("vars_condition", {})) 475 self.precedence = kwargs.pop("precedence", "autoparal_conf") 476 self.autoparal_priorities = kwargs.pop("autoparal_priorities", ["speedup"]) 477 #self.autoparal_priorities = kwargs.pop("autoparal_priorities", ["speedup", "efficiecy", "memory"] 478 # TODO frozen_timeout could be computed as a fraction of the timelimit of the qadapter! 479 self.frozen_timeout = qu.slurm_parse_timestr(kwargs.pop("frozen_timeout", "0-1:00:00")) 480 481 if kwargs: 482 raise ValueError("Found invalid keywords in policy section:\n %s" % str(kwargs.keys())) 483 484 # Consistency check. 485 if self.precedence not in ("qadapter", "autoparal_conf"): 486 raise ValueError("Wrong value for policy.precedence, should be qadapter or autoparal_conf") 487 488 def __str__(self): 489 lines = [] 490 app = lines.append 491 for k, v in self.__dict__.items(): 492 if k.startswith("_"): continue 493 app("%s: %s" % (k, v)) 494 return "\n".join(lines) 495 496 497class ManagerIncreaseError(Exception): 498 """ 499 Exception raised by the manager if the increase request failed 500 """ 501 502 503class FixQueueCriticalError(Exception): 504 """ 505 Error raised when an error could not be fixed at the task level 506 """ 507 508 509# Global variable used to store the task manager returned by `from_user_config`. 510_USER_CONFIG_TASKMANAGER = None 511 512 513def set_user_config_taskmanager(manager): 514 """Change the default manager returned by TaskManager.from_user_config.""" 515 global _USER_CONFIG_TASKMANAGER 516 _USER_CONFIG_TASKMANAGER = manager 517 518 519class TaskManager(MSONable): 520 """ 521 A `TaskManager` is responsible for the generation of the job script and the submission 522 of the task, as well as for the specification of the parameters passed to the resource manager 523 (e.g. Slurm, PBS ...) and/or the run-time specification of the ABINIT variables governing the parallel execution. 524 A `TaskManager` delegates the generation of the submission script and the submission of the task to the :class:`QueueAdapter`. 525 A `TaskManager` has a :class:`TaskPolicy` that governs the specification of the parameters for the parallel executions. 526 Ideally, the TaskManager should be the **main entry point** used by the task to deal with job submission/optimization 527 """ 528 YAML_FILE = "manager.yml" 529 USER_CONFIG_DIR = os.path.join(os.path.expanduser("~"), ".abinit", "abipy") 530 531 ENTRIES = {"policy", "qadapters", "db_connector", "batch_adapter"} 532 533 @classmethod 534 def autodoc(cls): 535 s = """ 536# TaskManager configuration file (YAML Format) 537 538policy: 539 # Dictionary with options used to control the execution of the tasks. 540 541qadapters: 542 # List of qadapters objects (mandatory) 543 - # qadapter_1 544 - # qadapter_2 545 546db_connector: 547 # Connection to MongoDB database (optional) 548 549batch_adapter: 550 # Adapter used to submit flows with batch script. (optional) 551 552########################################## 553# Individual entries are documented below: 554########################################## 555 556""" 557 s += "policy: " + TaskPolicy.autodoc() + "\n" 558 s += "qadapter: " + QueueAdapter.autodoc() + "\n" 559 return s 560 561 @classmethod 562 def from_user_config(cls): 563 """ 564 Initialize the |TaskManager| from the YAML file 'manager.yaml'. 565 Search first in the working directory and then in the AbiPy configuration directory. 566 567 Raises: 568 RuntimeError if file is not found. 569 """ 570 global _USER_CONFIG_TASKMANAGER 571 if _USER_CONFIG_TASKMANAGER is not None: 572 return _USER_CONFIG_TASKMANAGER 573 574 # Try in the current directory then in user configuration directory. 575 path = os.path.join(os.getcwd(), cls.YAML_FILE) 576 if not os.path.exists(path): 577 path = os.path.join(cls.USER_CONFIG_DIR, cls.YAML_FILE) 578 579 if not os.path.exists(path): 580 raise RuntimeError(colored( 581 "\nCannot locate %s neither in current directory nor in %s\n" 582 "PLEASE READ THIS: !!!\n" 583 "To use AbiPy to run jobs this file must be present\n" 584 "It provides a description of the cluster/computer you are running on\n" 585 "Examples are provided in abipy/data/managers.\n" 586 "Use `abidoc.py manager` to access the documentation in the terminal.\n" 587 "See also: https://abinit.github.io/abipy/workflows/manager_examples.html\n" % ( 588 cls.YAML_FILE, path), color="red")) 589 590 _USER_CONFIG_TASKMANAGER = cls.from_file(path) 591 return _USER_CONFIG_TASKMANAGER 592 593 @classmethod 594 def from_file(cls, filename): 595 """Read the configuration parameters from the Yaml file filename.""" 596 try: 597 with open(filename, "rt") as fh: 598 return cls.from_dict(yaml.safe_load(fh)) 599 except Exception as exc: 600 print("Error while reading TaskManager parameters from %s\n" % filename) 601 raise 602 603 @classmethod 604 def from_string(cls, s): 605 """Create an instance from string s containing a YAML dictionary.""" 606 return cls.from_dict(yaml.safe_load(s)) 607 608 @classmethod 609 def as_manager(cls, obj): 610 """ 611 Convert obj into TaskManager instance. Accepts string, filepath, dictionary, `TaskManager` object. 612 If obj is None, the manager is initialized from the user config file. 613 """ 614 if isinstance(obj, cls): return obj 615 if obj is None: return cls.from_user_config() 616 617 if is_string(obj): 618 if os.path.exists(obj): 619 return cls.from_file(obj) 620 else: 621 return cls.from_string(obj) 622 623 elif isinstance(obj, collections.abc.Mapping): 624 return cls.from_dict(obj) 625 else: 626 raise TypeError("Don't know how to convert type %s to TaskManager" % type(obj)) 627 628 @classmethod 629 def from_dict(cls, d): 630 """Create an instance from a dictionary.""" 631 return cls(**{k: v for k, v in d.items() if k in cls.ENTRIES}) 632 633 @pmg_serialize 634 def as_dict(self): 635 return copy.deepcopy(self._kwargs) 636 637 def __init__(self, **kwargs): 638 """ 639 Args: 640 policy:None 641 qadapters:List of qadapters in YAML format 642 db_connector:Dictionary with data used to connect to the database (optional) 643 """ 644 # Keep a copy of kwargs 645 self._kwargs = copy.deepcopy(kwargs) 646 647 self.policy = TaskPolicy.as_policy(kwargs.pop("policy", None)) 648 649 # Initialize database connector (if specified) 650 self.db_connector = DBConnector(**kwargs.pop("db_connector", {})) 651 652 # Build list of QAdapters. Neglect entry if priority == 0 or `enabled: no" 653 qads = [] 654 for d in kwargs.pop("qadapters"): 655 if d.get("enabled", False): continue 656 qad = make_qadapter(**d) 657 if qad.priority > 0: 658 qads.append(qad) 659 elif qad.priority < 0: 660 raise ValueError("qadapter cannot have negative priority:\n %s" % qad) 661 662 if not qads: 663 raise ValueError("Received emtpy list of qadapters") 664 #if len(qads) != 1: 665 # raise NotImplementedError("For the time being multiple qadapters are not supported! Please use one adapter") 666 667 # Order qdapters according to priority. 668 qads = sorted(qads, key=lambda q: q.priority) 669 priorities = [q.priority for q in qads] 670 if len(priorities) != len(set(priorities)): 671 raise ValueError("Two or more qadapters have same priority. This is not allowed. Check taskmanager.yml") 672 673 self._qads, self._qadpos = tuple(qads), 0 674 675 # Initialize the qadapter for batch script submission. 676 d = kwargs.pop("batch_adapter", None) 677 self.batch_adapter = None 678 if d: self.batch_adapter = make_qadapter(**d) 679 #print("batch_adapter", self.batch_adapter) 680 681 if kwargs: 682 raise ValueError("Found invalid keywords in the taskmanager file:\n %s" % str(list(kwargs.keys()))) 683 684 @lazy_property 685 def abinit_build(self): 686 """:class:`AbinitBuild` object with Abinit version and options used to build the code""" 687 return AbinitBuild(manager=self) 688 689 def to_shell_manager(self, mpi_procs=1): 690 """ 691 Returns a new |TaskManager| with the same parameters as self but replace the :class:`QueueAdapter` 692 with a :class:`ShellAdapter` with mpi_procs so that we can submit the job without passing through the queue. 693 """ 694 my_kwargs = copy.deepcopy(self._kwargs) 695 my_kwargs["policy"] = TaskPolicy(autoparal=0) 696 697 # On BlueGene we need at least two qadapters. 698 # One for running jobs on the computing nodes and another one 699 # for running small jobs on the fronted. These two qadapters 700 # will have different enviroments and different executables. 701 # If None of the q-adapters has qtype==shell, we change qtype to shell 702 # and we return a new Manager for sequential jobs with the same parameters as self. 703 # If the list contains a qadapter with qtype == shell, we ignore the remaining qadapters 704 # when we build the new Manager. 705 has_shell_qad = False 706 for d in my_kwargs["qadapters"]: 707 if d["queue"]["qtype"] == "shell": has_shell_qad = True 708 if has_shell_qad: 709 my_kwargs["qadapters"] = [d for d in my_kwargs["qadapters"] if d["queue"]["qtype"] == "shell"] 710 711 for d in my_kwargs["qadapters"]: 712 d["queue"]["qtype"] = "shell" 713 d["limits"]["min_cores"] = mpi_procs 714 d["limits"]["max_cores"] = mpi_procs 715 716 # If shell_runner is specified, replace mpi_runner with shell_runner 717 # in the script used to run jobs on the frontend. 718 # On same machines based on Slurm, indeed, mpirun/mpiexec is not available 719 # and jobs should be executed with `srun -n4 exec` when running on the computing nodes 720 # or with `exec` when running in sequential on the frontend. 721 if "job" in d and "shell_runner" in d["job"]: 722 shell_runner = d["job"]["shell_runner"] 723 #print("shell_runner:", shell_runner, type(shell_runner)) 724 if not shell_runner or shell_runner == "None": shell_runner = "" 725 d["job"]["mpi_runner"] = shell_runner 726 #print("shell_runner:", shell_runner) 727 728 #print(my_kwargs) 729 new = self.__class__(**my_kwargs) 730 new.set_mpi_procs(mpi_procs) 731 732 return new 733 734 def new_with_fixed_mpi_omp(self, mpi_procs, omp_threads): 735 """ 736 Return a new `TaskManager` in which autoparal has been disabled. 737 The jobs will be executed with `mpi_procs` MPI processes and `omp_threads` OpenMP threads. 738 Useful for generating input files for benchmarks. 739 """ 740 new = self.deepcopy() 741 new.policy.autoparal = 0 742 new.set_mpi_procs(mpi_procs) 743 new.set_omp_threads(omp_threads) 744 return new 745 746 @property 747 def has_queue(self): 748 """True if we are submitting jobs via a queue manager.""" 749 return self.qadapter.QTYPE.lower() != "shell" 750 751 @property 752 def qads(self): 753 """List of :class:`QueueAdapter` objects sorted according to priorities (highest comes first)""" 754 return self._qads 755 756 @property 757 def qadapter(self): 758 """The qadapter used to submit jobs.""" 759 return self._qads[self._qadpos] 760 761 def select_qadapter(self, pconfs): 762 """ 763 Given a list of parallel configurations, pconfs, this method select an `optimal` configuration 764 according to some criterion as well as the :class:`QueueAdapter` to use. 765 766 Args: 767 pconfs: :class:`ParalHints` object with the list of parallel configurations 768 769 Returns: 770 :class:`ParallelConf` object with the `optimal` configuration. 771 """ 772 # Order the list of configurations according to policy. 773 policy, max_ncpus = self.policy, self.max_cores 774 pconfs = pconfs.get_ordered_with_policy(policy, max_ncpus) 775 776 if policy.precedence == "qadapter": 777 778 # Try to run on the qadapter with the highest priority. 779 for qadpos, qad in enumerate(self.qads): 780 possible_pconfs = [pc for pc in pconfs if qad.can_run_pconf(pc)] 781 782 if qad.allocation == "nodes": 783 #if qad.allocation in ["nodes", "force_nodes"]: 784 # Select the configuration divisible by nodes if possible. 785 for pconf in possible_pconfs: 786 if pconf.num_cores % qad.hw.cores_per_node == 0: 787 return self._use_qadpos_pconf(qadpos, pconf) 788 789 # Here we select the first one. 790 if possible_pconfs: 791 return self._use_qadpos_pconf(qadpos, possible_pconfs[0]) 792 793 elif policy.precedence == "autoparal_conf": 794 # Try to run on the first pconf irrespectively of the priority of the qadapter. 795 for pconf in pconfs: 796 for qadpos, qad in enumerate(self.qads): 797 798 if qad.allocation == "nodes" and not pconf.num_cores % qad.hw.cores_per_node == 0: 799 continue # Ignore it. not very clean 800 801 if qad.can_run_pconf(pconf): 802 return self._use_qadpos_pconf(qadpos, pconf) 803 804 else: 805 raise ValueError("Wrong value of policy.precedence = %s" % policy.precedence) 806 807 # No qadapter could be found 808 raise RuntimeError("Cannot find qadapter for this run!") 809 810 def _use_qadpos_pconf(self, qadpos, pconf): 811 """ 812 This function is called when we have accepted the :class:`ParalConf` pconf. 813 Returns pconf 814 """ 815 self._qadpos = qadpos 816 817 # Change the number of MPI/OMP cores. 818 self.set_mpi_procs(pconf.mpi_procs) 819 if self.has_omp: self.set_omp_threads(pconf.omp_threads) 820 821 # Set memory per proc. 822 #FIXME: Fixer may have changed the memory per proc and should not be resetted by ParalConf 823 #self.set_mem_per_proc(pconf.mem_per_proc) 824 return pconf 825 826 def __str__(self): 827 """String representation.""" 828 lines = [] 829 app = lines.append 830 #app("[Task policy]\n%s" % str(self.policy)) 831 832 for i, qad in enumerate(self.qads): 833 app("[Qadapter %d]\n%s" % (i, str(qad))) 834 app("Qadapter selected: %d" % self._qadpos) 835 836 if self.has_db: 837 app("[MongoDB database]:") 838 app(str(self.db_connector)) 839 840 return "\n".join(lines) 841 842 @property 843 def has_db(self): 844 """True if we are using MongoDB database""" 845 return bool(self.db_connector) 846 847 @property 848 def has_omp(self): 849 """True if we are using OpenMP parallelization.""" 850 return self.qadapter.has_omp 851 852 @property 853 def num_cores(self): 854 """Total number of CPUs used to run the task.""" 855 return self.qadapter.num_cores 856 857 @property 858 def mpi_procs(self): 859 """Number of MPI processes.""" 860 return self.qadapter.mpi_procs 861 862 @property 863 def mem_per_proc(self): 864 """Memory per MPI process.""" 865 return self.qadapter.mem_per_proc 866 867 @property 868 def omp_threads(self): 869 """Number of OpenMP threads""" 870 return self.qadapter.omp_threads 871 872 def deepcopy(self): 873 """Deep copy of self.""" 874 return copy.deepcopy(self) 875 876 def set_mpi_procs(self, mpi_procs): 877 """Set the number of MPI processes to use.""" 878 self.qadapter.set_mpi_procs(mpi_procs) 879 880 def set_omp_threads(self, omp_threads): 881 """Set the number of OpenMp threads to use.""" 882 self.qadapter.set_omp_threads(omp_threads) 883 884 def set_mem_per_proc(self, mem_mb): 885 """Set the memory (in Megabytes) per CPU.""" 886 self.qadapter.set_mem_per_proc(mem_mb) 887 888 @property 889 def max_cores(self): 890 """ 891 Maximum number of cores that can be used. 892 This value is mainly used in the autoparal part to get the list of possible configurations. 893 """ 894 return max(q.hint_cores for q in self.qads) 895 896 def get_njobs_in_queue(self, username=None): 897 """ 898 returns the number of jobs in the queue, 899 returns None when the number of jobs cannot be determined. 900 901 Args: 902 username: (str) the username of the jobs to count (default is to autodetect) 903 """ 904 return self.qadapter.get_njobs_in_queue(username=username) 905 906 def cancel(self, job_id): 907 """Cancel the job. Returns exit status.""" 908 return self.qadapter.cancel(job_id) 909 910 def write_jobfile(self, task, **kwargs): 911 """ 912 Write the submission script. Return the path of the script 913 914 ================ ============================================ 915 kwargs Meaning 916 ================ ============================================ 917 exec_args List of arguments passed to task.executable. 918 Default: no arguments. 919 920 ================ ============================================ 921 """ 922 script = self.qadapter.get_script_str( 923 job_name=task.name, 924 launch_dir=task.workdir, 925 executable=task.executable, 926 qout_path=task.qout_file.path, 927 qerr_path=task.qerr_file.path, 928 stdin=task.files_file.path, 929 stdout=task.log_file.path, 930 stderr=task.stderr_file.path, 931 exec_args=kwargs.pop("exec_args", []), 932 ) 933 934 # Write the script. 935 with open(task.job_file.path, "w") as fh: 936 fh.write(script) 937 task.job_file.chmod(0o740) 938 return task.job_file.path 939 940 def launch(self, task, **kwargs): 941 """ 942 Build the input files and submit the task via the :class:`Qadapter` 943 944 Args: 945 task: |Task| object. 946 947 Returns: 948 Process object. 949 """ 950 if task.status == task.S_LOCKED: 951 raise ValueError("You shall not submit a locked task!") 952 953 # Build the task 954 task.build() 955 956 # Pass information on the time limit to Abinit (we always assume ndtset == 1) 957 if isinstance(task, AbinitTask): 958 args = kwargs.get("exec_args", []) 959 if args is None: args = [] 960 args = args[:] 961 args.append("--timelimit %s" % qu.time2slurm(self.qadapter.timelimit)) 962 kwargs["exec_args"] = args 963 964 # Write the submission script 965 script_file = self.write_jobfile(task, **kwargs) 966 967 # Submit the task and save the queue id. 968 try: 969 qjob, process = self.qadapter.submit_to_queue(script_file) 970 task.set_status(task.S_SUB, msg='Submitted to queue') 971 task.set_qjob(qjob) 972 return process 973 974 except self.qadapter.MaxNumLaunchesError as exc: 975 # TODO: Here we should try to switch to another qadapter 976 # 1) Find a new parallel configuration in those stored in task.pconfs 977 # 2) Change the input file. 978 # 3) Regenerate the submission script 979 # 4) Relaunch 980 task.set_status(task.S_ERROR, msg="max_num_launches reached: %s" % str(exc)) 981 raise 982 983 def get_collection(self, **kwargs): 984 """Return the MongoDB collection used to store the results.""" 985 return self.db_connector.get_collection(**kwargs) 986 987 def increase_mem(self): 988 # OLD 989 # with GW calculations in mind with GW mem = 10, 990 # the response fuction is in memory and not distributed 991 # we need to increase memory if jobs fail ... 992 # return self.qadapter.more_mem_per_proc() 993 try: 994 self.qadapter.more_mem_per_proc() 995 except QueueAdapterError: 996 # here we should try to switch to an other qadapter 997 raise ManagerIncreaseError('manager failed to increase mem') 998 999 def increase_ncpus(self): 1000 """ 1001 increase the number of cpus, first ask the current qadapter, if that one raises a QadapterIncreaseError 1002 switch to the next qadapter. If all fail raise an ManagerIncreaseError 1003 """ 1004 try: 1005 self.qadapter.more_cores() 1006 except QueueAdapterError: 1007 # here we should try to switch to an other qadapter 1008 raise ManagerIncreaseError('manager failed to increase ncpu') 1009 1010 def increase_resources(self): 1011 try: 1012 self.qadapter.more_cores() 1013 return 1014 except QueueAdapterError: 1015 pass 1016 1017 try: 1018 self.qadapter.more_mem_per_proc() 1019 except QueueAdapterError: 1020 # here we should try to switch to an other qadapter 1021 raise ManagerIncreaseError('manager failed to increase resources') 1022 1023 def exclude_nodes(self, nodes): 1024 try: 1025 self.qadapter.exclude_nodes(nodes=nodes) 1026 except QueueAdapterError: 1027 # here we should try to switch to an other qadapter 1028 raise ManagerIncreaseError('manager failed to exclude nodes') 1029 1030 def increase_time(self): 1031 try: 1032 self.qadapter.more_time() 1033 except QueueAdapterError: 1034 # here we should try to switch to an other qadapter 1035 raise ManagerIncreaseError('manager failed to increase time') 1036 1037 1038class AbinitBuild(object): 1039 """ 1040 This object stores information on the options used to build Abinit 1041 1042 .. attribute:: info 1043 String with build information as produced by `abinit -b` 1044 1045 .. attribute:: version 1046 Abinit version number e.g 8.0.1 (string) 1047 1048 .. attribute:: has_netcdf 1049 True if netcdf is enabled. 1050 1051 .. attribute:: has_libxc 1052 True if libxc is enabled. 1053 1054 .. attribute:: has_omp 1055 True if OpenMP is enabled. 1056 1057 .. attribute:: has_mpi 1058 True if MPI is enabled. 1059 1060 .. attribute:: has_mpiio 1061 True if MPI-IO is supported. 1062 """ 1063 def __init__(self, workdir=None, manager=None): 1064 manager = TaskManager.as_manager(manager).to_shell_manager(mpi_procs=1) 1065 1066 # Build a simple manager to run the job in a shell subprocess 1067 import tempfile 1068 workdir = tempfile.mkdtemp() if workdir is None else workdir 1069 1070 # Generate a shell script to execute `abinit -b` 1071 stdout = os.path.join(workdir, "run.abo") 1072 script = manager.qadapter.get_script_str( 1073 job_name="abinit_b", 1074 launch_dir=workdir, 1075 executable="abinit", 1076 qout_path=os.path.join(workdir, "queue.qout"), 1077 qerr_path=os.path.join(workdir, "queue.qerr"), 1078 #stdin=os.path.join(workdir, "run.files"), 1079 stdout=stdout, 1080 stderr=os.path.join(workdir, "run.err"), 1081 exec_args=["-b"], 1082 ) 1083 1084 # Execute the script. 1085 script_file = os.path.join(workdir, "job.sh") 1086 with open(script_file, "wt") as fh: 1087 fh.write(script) 1088 qjob, process = manager.qadapter.submit_to_queue(script_file) 1089 process.wait() 1090 1091 if process.returncode != 0: 1092 logger.critical("Error while executing %s" % script_file) 1093 print("stderr:\n", process.stderr.read()) 1094 #print("stdout:", process.stdout.read()) 1095 1096 # To avoid: ResourceWarning: unclosed file <_io.BufferedReader name=87> in py3k 1097 process.stderr.close() 1098 1099 with open(stdout, "rt") as fh: 1100 self.info = fh.read() 1101 1102 # info string has the following format. 1103 """ 1104 === Build Information === 1105 Version : 8.0.1 1106 Build target : x86_64_darwin15.0.0_gnu5.3 1107 Build date : 20160122 1108 1109 === Compiler Suite === 1110 C compiler : gnu 1111 C++ compiler : gnuApple 1112 Fortran compiler : gnu5.3 1113 CFLAGS : -g -O2 -mtune=native -march=native 1114 CXXFLAGS : -g -O2 -mtune=native -march=native 1115 FCFLAGS : -g -ffree-line-length-none 1116 FC_LDFLAGS : 1117 1118 === Optimizations === 1119 Debug level : basic 1120 Optimization level : standard 1121 Architecture : unknown_unknown 1122 1123 === Multicore === 1124 Parallel build : yes 1125 Parallel I/O : yes 1126 openMP support : no 1127 GPU support : no 1128 1129 === Connectors / Fallbacks === 1130 Connectors on : yes 1131 Fallbacks on : yes 1132 DFT flavor : libxc-fallback+atompaw-fallback+wannier90-fallback 1133 FFT flavor : none 1134 LINALG flavor : netlib 1135 MATH flavor : none 1136 TIMER flavor : abinit 1137 TRIO flavor : netcdf+etsf_io-fallback 1138 1139 === Experimental features === 1140 Bindings : @enable_bindings@ 1141 Exports : no 1142 GW double-precision : yes 1143 1144 === Bazaar branch information === 1145 Branch ID : gmatteo@gmac-20160112110440-lf6exhneqim9082h 1146 Revision : 1226 1147 Committed : 0 1148 """ 1149 self.version = "0.0.0" 1150 self.has_netcdf = False 1151 self.has_omp = False 1152 self.has_mpi, self.has_mpiio = False, False 1153 self.has_libxc = False 1154 1155 def yesno2bool(line): 1156 ans = line.split()[-1].lower() 1157 try: 1158 return dict(yes=True, no=False, auto=True)[ans] 1159 except KeyError: 1160 # Temporary hack for abinit v9 1161 return True 1162 1163 # Parse info. 1164 # flavor options were used in Abinit v8 1165 for line in self.info.splitlines(): 1166 if "Version" in line: self.version = line.split()[-1] 1167 if "TRIO flavor" in line: 1168 self.has_netcdf = "netcdf" in line 1169 if "NetCDF Fortran" in line: 1170 self.has_netcdf = yesno2bool(line) 1171 if "DFT flavor" in line: 1172 self.has_libxc = "libxc" in line 1173 if "LibXC" in line: 1174 self.has_libxc = yesno2bool(line) 1175 if "openMP support" in line: 1176 self.has_omp = yesno2bool(line) 1177 if "Parallel build" in line: 1178 ans = line.split()[-1].lower() 1179 if ans == "@enable_mpi@": 1180 # Temporary hack for abinit v9 1181 self.has_mpi = True 1182 else: 1183 self.has_mpi = yesno2bool(line) 1184 if "Parallel I/O" in line: 1185 self.has_mpiio = yesno2bool(line) 1186 1187 # Temporary hack for abinit v9 1188 #from abipy.core.testing import cmp_version 1189 #if cmp_version(self.version, "9.0.0", op=">="): 1190 # self.has_netcdf = True 1191 1192 def __str__(self): 1193 lines = [] 1194 app = lines.append 1195 app("Abinit Build Information:") 1196 app(" Abinit version: %s" % self.version) 1197 app(" MPI: %s, MPI-IO: %s, OpenMP: %s" % (self.has_mpi, self.has_mpiio, self.has_omp)) 1198 app(" Netcdf: %s" % self.has_netcdf) 1199 return "\n".join(lines) 1200 1201 def version_ge(self, version_string): 1202 """True is Abinit version is >= version_string""" 1203 return self.compare_version(version_string, ">=") 1204 1205 def compare_version(self, version_string, op): 1206 """Compare Abinit version to `version_string` with operator `op`""" 1207 from pkg_resources import parse_version 1208 from monty.operator import operator_from_str 1209 op = operator_from_str(op) 1210 return op(parse_version(self.version), parse_version(version_string)) 1211 1212 1213class FakeProcess(object): 1214 """ 1215 This object is attached to a |Task| instance if the task has not been submitted 1216 This trick allows us to simulate a process that is still running so that 1217 we can safely poll task.process. 1218 """ 1219 def poll(self): 1220 return None 1221 1222 def wait(self): 1223 raise RuntimeError("Cannot wait a FakeProcess") 1224 1225 def communicate(self, input=None): 1226 raise RuntimeError("Cannot communicate with a FakeProcess") 1227 1228 def kill(self): 1229 raise RuntimeError("Cannot kill a FakeProcess") 1230 1231 @property 1232 def returncode(self): 1233 return None 1234 1235 1236class MyTimedelta(datetime.timedelta): 1237 """A customized version of timedelta whose __str__ method doesn't print microseconds.""" 1238 def __new__(cls, days, seconds, microseconds): 1239 return datetime.timedelta.__new__(cls, days, seconds, microseconds) 1240 1241 def __str__(self): 1242 """Remove microseconds from timedelta default __str__""" 1243 s = super().__str__() 1244 microsec = s.find(".") 1245 if microsec != -1: s = s[:microsec] 1246 return s 1247 1248 @classmethod 1249 def as_timedelta(cls, delta): 1250 """Convert delta into a MyTimedelta object.""" 1251 # Cannot monkey patch the __class__ and must pass through __new__ as the object is immutable. 1252 if isinstance(delta, cls): return delta 1253 return cls(delta.days, delta.seconds, delta.microseconds) 1254 1255 1256class TaskDateTimes(object): 1257 """ 1258 Small object containing useful :class:`datetime.datatime` objects associated to important events. 1259 1260 .. attributes: 1261 1262 init: initialization datetime 1263 submission: submission datetime 1264 start: Begin of execution. 1265 end: End of execution. 1266 """ 1267 def __init__(self): 1268 self.init = datetime.datetime.now() 1269 self.submission, self.start, self.end = None, None, None 1270 1271 def __str__(self): 1272 lines = [] 1273 app = lines.append 1274 1275 app("Initialization done on: %s" % self.init) 1276 if self.submission is not None: app("Submitted on: %s" % self.submission) 1277 if self.start is not None: app("Started on: %s" % self.start) 1278 if self.end is not None: app("Completed on: %s" % self.end) 1279 1280 return "\n".join(lines) 1281 1282 def reset(self): 1283 """Reinitialize the counters.""" 1284 self = self.__class__() 1285 1286 def get_runtime(self): 1287 """:class:`timedelta` with the run-time, None if the Task is not running""" 1288 if self.start is None: return None 1289 1290 if self.end is None: 1291 delta = datetime.datetime.now() - self.start 1292 else: 1293 delta = self.end - self.start 1294 1295 return MyTimedelta.as_timedelta(delta) 1296 1297 def get_time_inqueue(self): 1298 """ 1299 :class:`timedelta` with the time spent in the Queue, None if the Task is not running 1300 1301 .. note: 1302 1303 This value is always greater than the real value computed by the resource manager 1304 as we start to count only when check_status sets the `Task` status to S_RUN. 1305 """ 1306 if self.submission is None: return None 1307 1308 if self.start is None: 1309 delta = datetime.datetime.now() - self.submission 1310 else: 1311 delta = self.start - self.submission 1312 # This happens when we read the exact start datetime from the ABINIT log file. 1313 if delta.total_seconds() < 0: delta = datetime.timedelta(seconds=0) 1314 1315 return MyTimedelta.as_timedelta(delta) 1316 1317 1318class TaskError(NodeError): 1319 """Base Exception for |Task| methods""" 1320 1321 1322class TaskRestartError(TaskError): 1323 """Exception raised while trying to restart the |Task|.""" 1324 1325 1326class Task(Node, metaclass=abc.ABCMeta): 1327 """ 1328 A Task is a node that performs some kind of calculation. 1329 This is base class providing low-level methods. 1330 """ 1331 # Use class attributes for TaskErrors so that we don't have to import them. 1332 Error = TaskError 1333 RestartError = TaskRestartError 1334 1335 # List of `AbinitEvent` subclasses that are tested in the check_status method. 1336 # Subclasses should provide their own list if they need to check the converge status. 1337 CRITICAL_EVENTS = [] 1338 1339 # Prefixes for Abinit (input, output, temporary) files. 1340 Prefix = collections.namedtuple("Prefix", "idata odata tdata") 1341 pj = os.path.join 1342 1343 prefix = Prefix(pj("indata", "in"), pj("outdata", "out"), pj("tmpdata", "tmp")) 1344 del Prefix, pj 1345 1346 def __init__(self, input, workdir=None, manager=None, deps=None): 1347 """ 1348 Args: 1349 input: |AbinitInput| object. 1350 workdir: Path to the working directory. 1351 manager: |TaskManager| object. 1352 deps: Dictionary specifying the dependency of this node. 1353 None means that this Task has no dependency. 1354 """ 1355 # Init the node 1356 super().__init__() 1357 1358 self._input = input 1359 1360 if workdir is not None: 1361 self.set_workdir(workdir) 1362 1363 if manager is not None: 1364 self.set_manager(manager) 1365 1366 # Handle possible dependencies. 1367 if deps: 1368 self.add_deps(deps) 1369 1370 # Date-time associated to submission, start and end. 1371 self.datetimes = TaskDateTimes() 1372 1373 # Count the number of restarts. 1374 self.num_restarts = 0 1375 1376 self._qjob = None 1377 self.queue_errors = [] 1378 self.abi_errors = [] 1379 1380 # two flags that provide, dynamically, information on the scaling behavious of a task. If any process of fixing 1381 # finds none scaling behaviour, they should be switched. If a task type is clearly not scaling they should be 1382 # swiched. 1383 self.mem_scales = True 1384 self.load_scales = True 1385 1386 def __getstate__(self): 1387 """ 1388 Return state is pickled as the contents for the instance. 1389 1390 In this case we just remove the process since Subprocess objects cannot be pickled. 1391 This is the reason why we have to store the returncode in self._returncode instead 1392 of using self.process.returncode. 1393 """ 1394 return {k: v for k, v in self.__dict__.items() if k not in ["_process"]} 1395 1396 #@check_spectator 1397 def set_workdir(self, workdir, chroot=False): 1398 """Set the working directory. Cannot be set more than once unless chroot is True""" 1399 if not chroot and hasattr(self, "workdir") and self.workdir != workdir: 1400 raise ValueError("self.workdir != workdir: %s, %s" % (self.workdir, workdir)) 1401 1402 self.workdir = os.path.abspath(workdir) 1403 1404 # Files required for the execution. 1405 self.input_file = File(os.path.join(self.workdir, "run.abi")) 1406 self.output_file = File(os.path.join(self.workdir, "run.abo")) 1407 self.files_file = File(os.path.join(self.workdir, "run.files")) 1408 self.job_file = File(os.path.join(self.workdir, "job.sh")) 1409 self.log_file = File(os.path.join(self.workdir, "run.log")) 1410 self.stderr_file = File(os.path.join(self.workdir, "run.err")) 1411 self.start_lockfile = File(os.path.join(self.workdir, "__startlock__")) 1412 # This file is produced by Abinit if nprocs > 1 and MPI_ABORT. 1413 self.mpiabort_file = File(os.path.join(self.workdir, "__ABI_MPIABORTFILE__")) 1414 1415 # Directories with input|output|temporary data. 1416 self.wdir = Directory(self.workdir) 1417 self.indir = Directory(os.path.join(self.workdir, "indata")) 1418 self.outdir = Directory(os.path.join(self.workdir, "outdata")) 1419 self.tmpdir = Directory(os.path.join(self.workdir, "tmpdata")) 1420 1421 # stderr and output file of the queue manager. Note extensions. 1422 self.qerr_file = File(os.path.join(self.workdir, "queue.qerr")) 1423 self.qout_file = File(os.path.join(self.workdir, "queue.qout")) 1424 1425 def set_manager(self, manager): 1426 """Set the |TaskManager| used to launch the Task.""" 1427 self.manager = manager.deepcopy() 1428 1429 @property 1430 def work(self): 1431 """The |Work| containing this `Task`.""" 1432 return self._work 1433 1434 def set_work(self, work): 1435 """Set the |Work| associated to this |Task|.""" 1436 if not hasattr(self, "_work"): 1437 self._work = work 1438 else: 1439 if self._work != work: 1440 raise ValueError("self._work != work") 1441 1442 @property 1443 def flow(self): 1444 """The |Flow| containing this |Task|.""" 1445 return self.work.flow 1446 1447 @lazy_property 1448 def pos(self): 1449 """The position of the task in the |Flow|""" 1450 for i, task in enumerate(self.work): 1451 if self == task: 1452 return self.work.pos, i 1453 raise ValueError("Cannot find the position of %s in flow %s" % (self, self.flow)) 1454 1455 @property 1456 def pos_str(self): 1457 """String representation of self.pos""" 1458 return "w" + str(self.pos[0]) + "_t" + str(self.pos[1]) 1459 1460 @property 1461 def num_launches(self): 1462 """ 1463 Number of launches performed. This number includes both possible ABINIT restarts 1464 as well as possible launches done due to errors encountered with the resource manager 1465 or the hardware/software.""" 1466 return sum(q.num_launches for q in self.manager.qads) 1467 1468 @property 1469 def input(self): 1470 """AbinitInput object.""" 1471 return self._input 1472 1473 def get_inpvar(self, varname, default=None): 1474 """Return the value of the ABINIT variable varname, None if not present.""" 1475 return self.input.get(varname, default) 1476 1477 def set_vars(self, *args, **kwargs): 1478 """ 1479 Set the values of the ABINIT variables in the input file. Return dict with old values. 1480 """ 1481 kwargs.update(dict(*args)) 1482 old_values = {vname: self.input.get(vname) for vname in kwargs} 1483 self.input.set_vars(**kwargs) 1484 if kwargs or old_values: 1485 self.history.info("Setting input variables: %s" % str(kwargs)) 1486 self.history.info("Old values: %s" % str(old_values)) 1487 1488 return old_values 1489 1490 @property 1491 def initial_structure(self): 1492 """Initial structure of the task.""" 1493 return self.input.structure 1494 1495 def make_input(self, with_header=False): 1496 """Construct the input file of the calculation.""" 1497 s = str(self.input) 1498 if with_header: s = str(self) + "\n" + s 1499 return s 1500 1501 def ipath_from_ext(self, ext): 1502 """ 1503 Returns the path of the input file with extension ext. 1504 Use it when the file does not exist yet. 1505 """ 1506 return os.path.join(self.workdir, self.prefix.idata + "_" + ext) 1507 1508 def opath_from_ext(self, ext): 1509 """ 1510 Returns the path of the output file with extension ext. 1511 Use it when the file does not exist yet. 1512 """ 1513 return os.path.join(self.workdir, self.prefix.odata + "_" + ext) 1514 1515 @property 1516 @abc.abstractmethod 1517 def executable(self): 1518 """ 1519 Path to the executable associated to the task (internally stored in self._executable). 1520 """ 1521 1522 def set_executable(self, executable): 1523 """Set the executable associate to this task.""" 1524 self._executable = executable 1525 1526 @property 1527 def process(self): 1528 try: 1529 return self._process 1530 except AttributeError: 1531 # Attach a fake process so that we can poll it. 1532 return FakeProcess() 1533 1534 @property 1535 def is_abinit_task(self): 1536 """True if this task is a subclass of AbinitTask.""" 1537 return isinstance(self, AbinitTask) 1538 1539 @property 1540 def is_anaddb_task(self): 1541 """True if this task is a subclass of OpticTask.""" 1542 return isinstance(self, AnaddbTask) 1543 1544 @property 1545 def is_optic_task(self): 1546 """True if this task is a subclass of OpticTask.""" 1547 return isinstance(self, OpticTask) 1548 1549 @property 1550 def is_completed(self): 1551 """True if the task has been executed.""" 1552 return self.status >= self.S_DONE 1553 1554 @property 1555 def can_run(self): 1556 """The task can run if its status is < S_SUB and all the other dependencies (if any) are done!""" 1557 all_ok = all(stat == self.S_OK for stat in self.deps_status) 1558 return self.status < self.S_SUB and self.status != self.S_LOCKED and all_ok 1559 1560 #@check_spectator 1561 def cancel(self): 1562 """Cancel the job. Returns 1 if job was cancelled.""" 1563 if self.queue_id is None: return 0 1564 if self.status >= self.S_DONE: return 0 1565 1566 exit_status = self.manager.cancel(self.queue_id) 1567 if exit_status != 0: 1568 self.history.warning("manager.cancel returned exit_status: %s" % exit_status) 1569 return 0 1570 1571 # Remove output files and reset the status. 1572 self.history.info("Job %s cancelled by user" % self.queue_id) 1573 self.reset() 1574 return 1 1575 1576 def with_fixed_mpi_omp(self, mpi_procs, omp_threads): 1577 """ 1578 Disable autoparal and force execution with `mpi_procs` MPI processes 1579 and `omp_threads` OpenMP threads. Useful for generating benchmarks. 1580 """ 1581 manager = self.manager if hasattr(self, "manager") else self.flow.manager 1582 self.manager = manager.new_with_fixed_mpi_omp(mpi_procs, omp_threads) 1583 1584 #def set_max_ncores(self, max_ncores): 1585 # """ 1586 # """ 1587 # manager = self.manager if hasattr(self, "manager") else self.flow.manager 1588 # self.manager = manager.new_with_max_ncores(mpi_procs, omp_threads) 1589 1590 #@check_spectator 1591 def _on_done(self): 1592 self.fix_ofiles() 1593 1594 #@check_spectator 1595 def _on_ok(self): 1596 # Fix output file names. 1597 self.fix_ofiles() 1598 1599 # Get results 1600 results = self.on_ok() 1601 1602 self.finalized = True 1603 1604 return results 1605 1606 #@check_spectator 1607 def on_ok(self): 1608 """ 1609 This method is called once the `Task` has reached status S_OK. 1610 Subclasses should provide their own implementation 1611 1612 Returns: 1613 Dictionary that must contain at least the following entries: 1614 returncode: 1615 0 on success. 1616 message: 1617 a string that should provide a human-readable description of what has been performed. 1618 """ 1619 return dict(returncode=0, message="Calling on_all_ok of the base class!") 1620 1621 #@check_spectator 1622 def fix_ofiles(self): 1623 """ 1624 This method is called when the task reaches S_OK. 1625 It changes the extension of particular output files 1626 produced by Abinit so that the 'official' extension 1627 is preserved e.g. out_1WF14 --> out_1WF 1628 """ 1629 filepaths = self.outdir.list_filepaths() 1630 #self.history.info("in fix_ofiles with filepaths %s" % list(filepaths)) 1631 1632 old2new = FilepathFixer().fix_paths(filepaths) 1633 1634 for old, new in old2new.items(): 1635 self.history.info("will rename old %s to new %s" % (old, new)) 1636 os.rename(old, new) 1637 1638 #@check_spectator 1639 def _restart(self, submit=True): 1640 """ 1641 Called by restart once we have finished preparing the task for restarting. 1642 1643 Return: 1644 True if task has been restarted 1645 """ 1646 self.set_status(self.S_READY, msg="Restarted on %s" % time.asctime()) 1647 1648 # Increase the counter. 1649 self.num_restarts += 1 1650 self.history.info("Restarted, num_restarts %d" % self.num_restarts) 1651 1652 # Reset datetimes 1653 self.datetimes.reset() 1654 1655 # Remove the lock file 1656 self.start_lockfile.remove() 1657 1658 if submit: 1659 # Relaunch the task. 1660 fired = self.start() 1661 if not fired: self.history.warning("Restart failed") 1662 else: 1663 fired = False 1664 1665 return fired 1666 1667 #@check_spectator 1668 def restart(self): 1669 """ 1670 Restart the calculation. Subclasses should provide a concrete version that 1671 performs all the actions needed for preparing the restart and then calls self._restart 1672 to restart the task. The default implementation is empty. 1673 1674 Returns: 1675 1 if job was restarted, 0 otherwise. 1676 """ 1677 self.history.debug("Calling the **empty** restart method of the base class") 1678 return 0 1679 1680 def poll(self): 1681 """Check if child process has terminated. Set and return returncode attribute.""" 1682 self._returncode = self.process.poll() 1683 1684 if self._returncode is not None: 1685 self.set_status(self.S_DONE, "status set to DONE") 1686 1687 return self._returncode 1688 1689 def wait(self): 1690 """Wait for child process to terminate. Set and return returncode attribute.""" 1691 self._returncode = self.process.wait() 1692 try: 1693 self.process.stderr.close() 1694 except Exception: 1695 pass 1696 self.set_status(self.S_DONE, "status set to DONE") 1697 1698 return self._returncode 1699 1700 def communicate(self, input=None): 1701 """ 1702 Interact with process: Send data to stdin. Read data from stdout and stderr, until end-of-file is reached. 1703 Wait for process to terminate. The optional input argument should be a string to be sent to the 1704 child process, or None, if no data should be sent to the child. 1705 1706 communicate() returns a tuple (stdoutdata, stderrdata). 1707 """ 1708 stdoutdata, stderrdata = self.process.communicate(input=input) 1709 self._returncode = self.process.returncode 1710 self.set_status(self.S_DONE, "status set to DONE") 1711 1712 return stdoutdata, stderrdata 1713 1714 def kill(self): 1715 """Kill the child.""" 1716 self.process.kill() 1717 self.set_status(self.S_ERROR, "status set to ERROR by task.kill") 1718 self._returncode = self.process.returncode 1719 1720 @property 1721 def returncode(self): 1722 """ 1723 The child return code, set by poll() and wait() (and indirectly by communicate()). 1724 A None value indicates that the process hasn't terminated yet. 1725 A negative value -N indicates that the child was terminated by signal N (Unix only). 1726 """ 1727 try: 1728 return self._returncode 1729 except AttributeError: 1730 return 0 1731 1732 def reset(self): 1733 """ 1734 Reset the task status. Mainly used if we made a silly mistake in the initial 1735 setup of the queue manager and we want to fix it and rerun the task. 1736 1737 Returns: 1738 0 on success, 1 if reset failed. 1739 """ 1740 # Can only reset tasks that are done. 1741 # One should be able to reset 'Submitted' tasks (sometimes, they are not in the queue 1742 # and we want to restart them) 1743 #if self.status != self.S_SUB and self.status < self.S_DONE: return 1 1744 1745 # Remove output files otherwise the EventParser will think the job is still running 1746 self.output_file.remove() 1747 self.log_file.remove() 1748 self.stderr_file.remove() 1749 self.start_lockfile.remove() 1750 self.qerr_file.remove() 1751 self.qout_file.remove() 1752 if self.mpiabort_file.exists: 1753 self.mpiabort_file.remove() 1754 1755 self.set_status(self.S_INIT, msg="Reset on %s" % time.asctime()) 1756 self.num_restarts = 0 1757 self.set_qjob(None) 1758 1759 # Reset finalized flags. 1760 self.work.finalized = False 1761 self.flow.finalized = False 1762 1763 return 0 1764 1765 @property 1766 @return_none_if_raise(AttributeError) 1767 def queue_id(self): 1768 """Queue identifier returned by the Queue manager. None if not set""" 1769 return self.qjob.qid 1770 1771 @property 1772 @return_none_if_raise(AttributeError) 1773 def qname(self): 1774 """Queue name identifier returned by the Queue manager. None if not set""" 1775 return self.qjob.qname 1776 1777 @property 1778 def qjob(self): 1779 return self._qjob 1780 1781 def set_qjob(self, qjob): 1782 """Set info on queue after submission.""" 1783 self._qjob = qjob 1784 1785 @property 1786 def has_queue(self): 1787 """True if we are submitting jobs via a queue manager.""" 1788 return self.manager.qadapter.QTYPE.lower() != "shell" 1789 1790 @property 1791 def num_cores(self): 1792 """Total number of CPUs used to run the task.""" 1793 return self.manager.num_cores 1794 1795 @property 1796 def mpi_procs(self): 1797 """Number of CPUs used for MPI.""" 1798 return self.manager.mpi_procs 1799 1800 @property 1801 def omp_threads(self): 1802 """Number of CPUs used for OpenMP.""" 1803 return self.manager.omp_threads 1804 1805 @property 1806 def mem_per_proc(self): 1807 """Memory per MPI process.""" 1808 return Memory(self.manager.mem_per_proc, "Mb") 1809 1810 @property 1811 def status(self): 1812 """Gives the status of the task.""" 1813 return self._status 1814 1815 def lock(self, source_node): 1816 """Lock the task, source is the |Node| that applies the lock.""" 1817 if self.status != self.S_INIT: 1818 raise ValueError("Trying to lock a task with status %s" % self.status) 1819 1820 self._status = self.S_LOCKED 1821 self.history.info("Locked by node %s", source_node) 1822 1823 def unlock(self, source_node, check_status=True): 1824 """ 1825 Unlock the task, set its status to `S_READY` so that the scheduler can submit it. 1826 source_node is the |Node| that removed the lock 1827 Call task.check_status if check_status is True. 1828 """ 1829 if self.status != self.S_LOCKED: 1830 raise RuntimeError("Trying to unlock a task with status %s" % self.status) 1831 1832 self._status = self.S_READY 1833 if check_status: self.check_status() 1834 self.history.info("Unlocked by %s", source_node) 1835 1836 #@check_spectator 1837 def set_status(self, status, msg): 1838 """ 1839 Set and return the status of the task. 1840 1841 Args: 1842 status: Status object or string representation of the status 1843 msg: string with human-readable message used in the case of errors. 1844 """ 1845 # truncate string if it's long. msg will be logged in the object and we don't want to waste memory. 1846 if len(msg) > 2000: 1847 msg = msg[:2000] 1848 msg += "\n... snip ...\n" 1849 1850 # Locked files must be explicitly unlocked 1851 if self.status == self.S_LOCKED or status == self.S_LOCKED: 1852 err_msg = ( 1853 "Locked files must be explicitly unlocked before calling set_status but\n" 1854 "task.status = %s, input status = %s" % (self.status, status)) 1855 raise RuntimeError(err_msg) 1856 1857 status = Status.as_status(status) 1858 1859 changed = True 1860 if hasattr(self, "_status"): 1861 changed = (status != self._status) 1862 1863 self._status = status 1864 1865 if status == self.S_RUN: 1866 # Set datetimes.start when the task enters S_RUN 1867 if self.datetimes.start is None: 1868 self.datetimes.start = datetime.datetime.now() 1869 1870 # Add new entry to history only if the status has changed. 1871 if changed: 1872 if status == self.S_SUB: 1873 self.datetimes.submission = datetime.datetime.now() 1874 self.history.info("Submitted with MPI=%s, Omp=%s, Memproc=%.1f [Gb] %s " % ( 1875 self.mpi_procs, self.omp_threads, self.mem_per_proc.to("Gb"), msg)) 1876 1877 elif status == self.S_OK: 1878 self.history.info("Task completed %s", msg) 1879 1880 elif status == self.S_ABICRITICAL: 1881 self.history.info("Status set to S_ABI_CRITICAL due to: %s", msg) 1882 1883 else: 1884 self.history.info("Status changed to %s. msg: %s", status, msg) 1885 1886 ####################################################### 1887 # The section belows contains callbacks that should not 1888 # be executed if we are in spectator_mode 1889 ####################################################### 1890 if status == self.S_DONE: 1891 # Execute the callback 1892 self._on_done() 1893 1894 if status == self.S_OK: 1895 # Finalize the task. 1896 if not self.finalized: 1897 self._on_ok() 1898 1899 # here we remove the output files of the task and of its parents. 1900 if self.gc is not None and self.gc.policy == "task": 1901 self.clean_output_files() 1902 1903 if self.status == self.S_OK: 1904 # Because _on_ok might have changed the status. 1905 self.send_signal(self.S_OK) 1906 1907 return status 1908 1909 def check_status(self): 1910 """ 1911 This function checks the status of the task by inspecting the output and the 1912 error files produced by the application and by the queue manager. 1913 """ 1914 # 1) see it the job is blocked 1915 # 2) see if an error occured at submitting the job the job was submitted, TODO these problems can be solved 1916 # 3) see if there is output 1917 # 4) see if abinit reports problems 1918 # 5) see if both err files exist and are empty 1919 # 6) no output and no err files, the job must still be running 1920 # 7) try to find out what caused the problems 1921 # 8) there is a problem but we did not figure out what ... 1922 # 9) the only way of landing here is if there is a output file but no err files... 1923 1924 # 1) A locked task can only be unlocked by calling set_status explicitly. 1925 # an errored task, should not end up here but just to be sure 1926 black_list = (self.S_LOCKED, self.S_ERROR) 1927 #if self.status in black_list: return self.status 1928 1929 # 2) Check the returncode of the job script 1930 if self.returncode != 0: 1931 msg = "job.sh return code: %s\nPerhaps the job was not submitted properly?" % self.returncode 1932 return self.set_status(self.S_QCRITICAL, msg=msg) 1933 1934 # If we have an abort file produced by Abinit 1935 if self.mpiabort_file.exists: 1936 return self.set_status(self.S_ABICRITICAL, msg="Found ABINIT abort file") 1937 1938 # Analyze the stderr file for Fortran runtime errors. 1939 # getsize is 0 if the file is empty or it does not exist. 1940 err_msg = None 1941 if self.stderr_file.getsize() != 0: 1942 err_msg = self.stderr_file.read() 1943 1944 # Analyze the stderr file of the resource manager runtime errors. 1945 # TODO: Why are we looking for errors in queue.qerr? 1946 qerr_info = None 1947 if self.qerr_file.getsize() != 0: 1948 qerr_info = self.qerr_file.read() 1949 1950 # Analyze the stdout file of the resource manager (needed for PBS !) 1951 qout_info = None 1952 if self.qout_file.getsize(): 1953 qout_info = self.qout_file.read() 1954 1955 # Start to check ABINIT status if the output file has been created. 1956 #if self.output_file.getsize() != 0: 1957 if self.output_file.exists: 1958 try: 1959 report = self.get_event_report() 1960 except Exception as exc: 1961 msg = "%s exception while parsing event_report:\n%s" % (self, exc) 1962 return self.set_status(self.S_ABICRITICAL, msg=msg) 1963 1964 if report is None: 1965 return self.set_status(self.S_ERROR, msg="Got None report!") 1966 1967 if report.run_completed: 1968 # Here we set the correct timing data reported by Abinit 1969 self.datetimes.start = report.start_datetime 1970 self.datetimes.end = report.end_datetime 1971 1972 # Check if the calculation converged. 1973 not_ok = report.filter_types(self.CRITICAL_EVENTS) 1974 if not_ok: 1975 return self.set_status(self.S_UNCONVERGED, msg='status set to UNCONVERGED based on abiout') 1976 else: 1977 return self.set_status(self.S_OK, msg="status set to OK based on abiout") 1978 1979 # Calculation still running or errors? 1980 if report.errors: 1981 # Abinit reported problems 1982 self.history.debug('Found errors in report') 1983 for error in report.errors: 1984 self.history.debug(str(error)) 1985 try: 1986 self.abi_errors.append(error) 1987 except AttributeError: 1988 self.abi_errors = [error] 1989 1990 # The job is unfixable due to ABINIT errors 1991 self.history.debug("%s: Found Errors or Bugs in ABINIT main output!" % self) 1992 msg = "\n".join(map(repr, report.errors)) 1993 return self.set_status(self.S_ABICRITICAL, msg=msg) 1994 1995 # 5) 1996 if self.stderr_file.exists and not err_msg: 1997 if self.qerr_file.exists and not qerr_info: 1998 # there is output and no errors 1999 # The job still seems to be running 2000 return self.set_status(self.S_RUN, msg='there is output and no errors: job still seems to be running') 2001 2002 # 6) 2003 if not self.output_file.exists: 2004 #self.history.debug("output_file does not exists") 2005 if not self.stderr_file.exists and not self.qerr_file.exists: 2006 # No output at allThe job is still in the queue. 2007 return self.status 2008 2009 # 7) Analyze the files of the resource manager and abinit and execution err (mvs) 2010 # MG: This section has been disabled: several portability issues 2011 # Need more robust logic in error_parser, perhaps logic provided by users via callbacks. 2012 """ 2013 if False and (qerr_info or qout_info): 2014 from abipy.flowtk.scheduler_error_parsers import get_parser 2015 scheduler_parser = get_parser(self.manager.qadapter.QTYPE, err_file=self.qerr_file.path, 2016 out_file=self.qout_file.path, run_err_file=self.stderr_file.path) 2017 2018 if scheduler_parser is None: 2019 return self.set_status(self.S_QCRITICAL, 2020 msg="Cannot find scheduler_parser for qtype %s" % self.manager.qadapter.QTYPE) 2021 2022 scheduler_parser.parse() 2023 2024 if scheduler_parser.errors: 2025 # Store the queue errors in the task 2026 self.queue_errors = scheduler_parser.errors 2027 # The job is killed or crashed and we know what happened 2028 msg = "scheduler errors found:\n%s" % str(scheduler_parser.errors) 2029 return self.set_status(self.S_QCRITICAL, msg=msg) 2030 2031 elif lennone(qerr_info) > 0: 2032 # if only qout_info, we are not necessarily in QCRITICAL state, 2033 # since there will always be info in the qout file 2034 self.history.info('Found unknown message in the queue qerr file: %s' % str(qerr_info)) 2035 #try: 2036 # rt = self.datetimes.get_runtime().seconds 2037 #except Exception: 2038 # rt = -1.0 2039 #tl = self.manager.qadapter.timelimit 2040 #if rt > tl: 2041 # msg += 'set to error : runtime (%s) exceded walltime (%s)' % (rt, tl) 2042 # print(msg) 2043 # return self.set_status(self.S_ERROR, msg=msg) 2044 # The job may be killed or crashed but we don't know what happened 2045 # It may also be that an innocent message was written to qerr, so we wait for a while 2046 # it is set to QCritical, we will attempt to fix it by running on more resources 2047 """ 2048 2049 # 8) analyzing the err files and abinit output did not identify a problem 2050 # but if the files are not empty we do have a problem but no way of solving it: 2051 # The job is killed or crashed but we don't know what happend 2052 # it is set to QCritical, we will attempt to fix it by running on more resources 2053 if err_msg: 2054 msg = 'Found error message:\n %s' % str(err_msg) 2055 self.history.warning(msg) 2056 #return self.set_status(self.S_QCRITICAL, msg=msg) 2057 2058 # 9) if we still haven't returned there is no indication of any error and the job can only still be running 2059 # but we should actually never land here, or we have delays in the file system .... 2060 # print('the job still seems to be running maybe it is hanging without producing output... ') 2061 2062 # Check time of last modification. 2063 if self.output_file.exists and \ 2064 (time.time() - self.output_file.get_stat().st_mtime > self.manager.policy.frozen_timeout): 2065 msg = "Task seems to be frozen, last change more than %s [s] ago" % self.manager.policy.frozen_timeout 2066 return self.set_status(self.S_ERROR, msg=msg) 2067 2068 # Handle weird case in which either run.abo, or run.log have not been produced 2069 #if self.status not in (self.S_INIT, self.S_READY) and (not self.output.file.exists or not self.log_file.exits): 2070 # msg = "Task have been submitted but cannot find the log file or the output file" 2071 # return self.set_status(self.S_ERROR, msg) 2072 2073 return self.set_status(self.S_RUN, msg='final option: nothing seems to be wrong, the job must still be running') 2074 2075 def reduce_memory_demand(self): 2076 """ 2077 Method that can be called by the scheduler to decrease the memory demand of a specific task. 2078 Returns True in case of success, False in case of Failure. 2079 Should be overwritten by specific tasks. 2080 """ 2081 return False 2082 2083 def speed_up(self): 2084 """ 2085 Method that can be called by the flow to decrease the time needed for a specific task. 2086 Returns True in case of success, False in case of Failure 2087 Should be overwritten by specific tasks. 2088 """ 2089 return False 2090 2091 def out_to_in(self, out_file): 2092 """ 2093 Move an output file to the output data directory of the `Task` 2094 and rename the file so that ABINIT will read it as an input data file. 2095 2096 Returns: 2097 The absolute path of the new file in the indata directory. 2098 """ 2099 in_file = os.path.basename(out_file).replace("out", "in", 1) 2100 dest = os.path.join(self.indir.path, in_file) 2101 2102 if os.path.exists(dest) and not os.path.islink(dest): 2103 self.history.warning("Will overwrite %s with %s" % (dest, out_file)) 2104 2105 os.rename(out_file, dest) 2106 return dest 2107 2108 def inlink_file(self, filepath): 2109 """ 2110 Create a symbolic link to the specified file in the 2111 directory containing the input files of the task. 2112 """ 2113 if not os.path.exists(filepath): 2114 self.history.debug("Creating symbolic link to not existent file %s" % filepath) 2115 2116 # Extract the Abinit extension and add the prefix for input files. 2117 root, abiext = abi_splitext(filepath) 2118 2119 infile = "in_" + abiext 2120 infile = self.indir.path_in(infile) 2121 2122 # Link path to dest if dest link does not exist. 2123 # else check that it points to the expected file. 2124 self.history.info("Linking path %s --> %s" % (filepath, infile)) 2125 2126 if not os.path.exists(infile): 2127 os.symlink(filepath, infile) 2128 else: 2129 if os.path.realpath(infile) != filepath: 2130 raise self.Error("infile %s does not point to filepath %s" % (infile, filepath)) 2131 2132 def make_links(self): 2133 """ 2134 Create symbolic links to the output files produced by the other tasks. 2135 2136 .. warning:: 2137 2138 This method should be called only when the calculation is READY because 2139 it uses a heuristic approach to find the file to link. 2140 """ 2141 for dep in self.deps: 2142 filepaths, exts = dep.get_filepaths_and_exts() 2143 2144 for path, ext in zip(filepaths, exts): 2145 self.history.info("Need path %s with ext %s" % (path, ext)) 2146 dest = self.ipath_from_ext(ext) 2147 2148 if not os.path.exists(path): 2149 # Try netcdf file. 2150 # TODO: this case should be treated in a cleaner way. 2151 path += ".nc" 2152 if os.path.exists(path): dest += ".nc" 2153 2154 if not os.path.exists(path): 2155 raise self.Error("\n%s: path `%s`\n is needed by this task but it does not exist" % (self, path)) 2156 2157 if path.endswith(".nc") and not dest.endswith(".nc"): # NC --> NC file 2158 dest += ".nc" 2159 2160 # Link path to dest if dest link does not exist. 2161 # else check that it points to the expected file. 2162 self.history.debug("Linking path %s --> %s" % (path, dest)) 2163 if not os.path.exists(dest): 2164 os.symlink(path, dest) 2165 else: 2166 # check links but only if we haven't performed the restart. 2167 # in this case, indeed we may have replaced the file pointer with the 2168 # previous output file of the present task. 2169 if os.path.realpath(dest) != path and self.num_restarts == 0: 2170 raise self.Error("\nDestination:\n `%s`\ndoes not point to path:\n `%s`" % (dest, path)) 2171 2172 @abc.abstractmethod 2173 def setup(self): 2174 """Public method called before submitting the task.""" 2175 2176 def _setup(self): 2177 """ 2178 This method calls self.setup after having performed additional operations 2179 such as the creation of the symbolic links needed to connect different tasks. 2180 """ 2181 self.make_links() 2182 self.setup() 2183 2184 def get_event_report(self, source="log"): 2185 """ 2186 Analyzes the main logfile of the calculation for possible Errors or Warnings. 2187 If the ABINIT abort file is found, the error found in this file are added to 2188 the output report. 2189 2190 Args: 2191 source: "output" for the main output file,"log" for the log file. 2192 2193 Returns: 2194 :class:`EventReport` instance or None if the source file file does not exist. 2195 """ 2196 # By default, we inspect the main log file. 2197 ofile = { 2198 "output": self.output_file, 2199 "log": self.log_file}[source] 2200 2201 parser = events.EventsParser() 2202 2203 if not ofile.exists: 2204 if not self.mpiabort_file.exists: 2205 return None 2206 else: 2207 # ABINIT abort file without log! 2208 abort_report = parser.parse(self.mpiabort_file.path) 2209 return abort_report 2210 2211 try: 2212 report = parser.parse(ofile.path) 2213 #self._prev_reports[source] = report 2214 2215 # Add events found in the ABI_MPIABORTFILE. 2216 if self.mpiabort_file.exists: 2217 self.history.critical("Found ABI_MPIABORTFILE!!!!!") 2218 abort_report = parser.parse(self.mpiabort_file.path) 2219 if len(abort_report) != 1: 2220 self.history.critical("Found more than one event in ABI_MPIABORTFILE") 2221 2222 # Weird case: empty abort file, let's skip the part 2223 # below and hope that the log file contains the error message. 2224 #if not len(abort_report): return report 2225 2226 # Add it to the initial report only if it differs 2227 # from the last one found in the main log file. 2228 last_abort_event = abort_report[-1] 2229 if report and last_abort_event != report[-1]: 2230 report.append(last_abort_event) 2231 else: 2232 report.append(last_abort_event) 2233 2234 return report 2235 2236 #except parser.Error as exc: 2237 except Exception as exc: 2238 # Return a report with an error entry with info on the exception. 2239 msg = "%s: Exception while parsing ABINIT events:\n %s" % (ofile, str(exc)) 2240 self.set_status(self.S_ABICRITICAL, msg=msg) 2241 return parser.report_exception(ofile.path, exc) 2242 2243 def get_results(self, **kwargs): 2244 """ 2245 Returns :class:`NodeResults` instance. 2246 Subclasses should extend this method (if needed) by adding 2247 specialized code that performs some kind of post-processing. 2248 """ 2249 # Check whether the process completed. 2250 if self.returncode is None: 2251 raise self.Error("return code is None, you should call wait, communicate or poll") 2252 2253 if self.status is None or self.status < self.S_DONE: 2254 raise self.Error("Task is not completed") 2255 2256 return self.Results.from_node(self) 2257 2258 def move(self, dest, is_abspath=False): 2259 """ 2260 Recursively move self.workdir to another location. This is similar to the Unix "mv" command. 2261 The destination path must not already exist. If the destination already exists 2262 but is not a directory, it may be overwritten depending on os.rename() semantics. 2263 2264 Be default, dest is located in the parent directory of self.workdir. 2265 Use is_abspath=True to specify an absolute path. 2266 """ 2267 if not is_abspath: 2268 dest = os.path.join(os.path.dirname(self.workdir), dest) 2269 2270 shutil.move(self.workdir, dest) 2271 2272 def in_files(self): 2273 """Return all the input data files used.""" 2274 return self.indir.list_filepaths() 2275 2276 def out_files(self): 2277 """Return all the output data files produced.""" 2278 return self.outdir.list_filepaths() 2279 2280 def tmp_files(self): 2281 """Return all the input data files produced.""" 2282 return self.tmpdir.list_filepaths() 2283 2284 def path_in_workdir(self, filename): 2285 """Create the absolute path of filename in the top-level working directory.""" 2286 return os.path.join(self.workdir, filename) 2287 2288 def rename(self, src_basename, dest_basename, datadir="outdir"): 2289 """ 2290 Rename a file located in datadir. 2291 2292 src_basename and dest_basename are the basename of the source file 2293 and of the destination file, respectively. 2294 """ 2295 directory = { 2296 "indir": self.indir, 2297 "outdir": self.outdir, 2298 "tmpdir": self.tmpdir, 2299 }[datadir] 2300 2301 src = directory.path_in(src_basename) 2302 dest = directory.path_in(dest_basename) 2303 2304 os.rename(src, dest) 2305 2306 #@check_spectator 2307 def build(self, *args, **kwargs): 2308 """ 2309 Creates the working directory and the input files of the |Task|. 2310 It does not overwrite files if they already exist. 2311 """ 2312 # Create dirs for input, output and tmp data. 2313 self.indir.makedirs() 2314 self.outdir.makedirs() 2315 self.tmpdir.makedirs() 2316 2317 # Write files file and input file. 2318 if not self.files_file.exists: 2319 self.files_file.write(self.filesfile_string) 2320 2321 self.input_file.write(self.make_input()) 2322 self.manager.write_jobfile(self) 2323 2324 #@check_spectator 2325 def rmtree(self, exclude_wildcard=""): 2326 """ 2327 Remove all files and directories in the working directory 2328 2329 Args: 2330 exclude_wildcard: Optional string with regular expressions separated by |. 2331 Files matching one of the regular expressions will be preserved. 2332 example: exclude_wildcard="*.nc|*.txt" preserves all the files whose extension is in ["nc", "txt"]. 2333 """ 2334 if not exclude_wildcard: 2335 shutil.rmtree(self.workdir) 2336 2337 else: 2338 w = WildCard(exclude_wildcard) 2339 2340 for dirpath, dirnames, filenames in os.walk(self.workdir): 2341 for fname in filenames: 2342 filepath = os.path.join(dirpath, fname) 2343 if not w.match(fname): 2344 os.remove(filepath) 2345 2346 def remove_files(self, *filenames): 2347 """Remove all the files listed in filenames.""" 2348 filenames = list_strings(filenames) 2349 2350 for dirpath, dirnames, fnames in os.walk(self.workdir): 2351 for fname in fnames: 2352 if fname in filenames: 2353 filepath = os.path.join(dirpath, fname) 2354 os.remove(filepath) 2355 2356 def clean_output_files(self, follow_parents=True): 2357 """ 2358 This method is called when the task reaches S_OK. It removes all the output files 2359 produced by the task that are not needed by its children as well as the output files 2360 produced by its parents if no other node needs them. 2361 2362 Args: 2363 follow_parents: If true, the output files of the parents nodes will be removed if possible. 2364 2365 Return: 2366 list with the absolute paths of the files that have been removed. 2367 """ 2368 paths = [] 2369 if self.status != self.S_OK: 2370 self.history.warning("Calling task.clean_output_files on a task whose status != S_OK") 2371 2372 # Remove all files in tmpdir. 2373 self.tmpdir.clean() 2374 2375 # Find the file extensions that should be preserved since these files are still 2376 # needed by the children who haven't reached S_OK 2377 except_exts = set() 2378 for child in self.get_children(): 2379 if child.status == self.S_OK: continue 2380 # Find the position of self in child.deps and add the extensions. 2381 i = [dep.node for dep in child.deps].index(self) 2382 except_exts.update(child.deps[i].exts) 2383 2384 # Remove the files in the outdir of the task but keep except_exts. 2385 exts = self.gc.exts.difference(except_exts) 2386 #print("Will remove its extensions: ", exts) 2387 paths += self.outdir.remove_exts(exts) 2388 if not follow_parents: return paths 2389 2390 # Remove the files in the outdir of my parents if all the possible dependencies have been fulfilled. 2391 for parent in self.get_parents(): 2392 2393 # Here we build a dictionary file extension --> list of child nodes requiring this file from parent 2394 # e.g {"WFK": [node1, node2]} 2395 ext2nodes = collections.defaultdict(list) 2396 for child in parent.get_children(): 2397 if child.status == child.S_OK: continue 2398 i = [d.node for d in child.deps].index(parent) 2399 for ext in child.deps[i].exts: 2400 ext2nodes[ext].append(child) 2401 2402 # Remove extension only if no node depends on it! 2403 except_exts = [k for k, lst in ext2nodes.items() if lst] 2404 exts = self.gc.exts.difference(except_exts) 2405 #print("%s removes extensions %s from parent node %s" % (self, exts, parent)) 2406 paths += parent.outdir.remove_exts(exts) 2407 2408 self.history.info("Removed files: %s" % paths) 2409 return paths 2410 2411 def setup(self): # noqa: E731,F811 2412 """Base class does not provide any hook.""" 2413 2414 #@check_spectator 2415 def start(self, **kwargs): 2416 """ 2417 Starts the calculation by performing the following steps: 2418 2419 - build dirs and files 2420 - call the _setup method 2421 - execute the job file by executing/submitting the job script. 2422 2423 Main entry point for the `Launcher`. 2424 2425 ============== ============================================================== 2426 kwargs Meaning 2427 ============== ============================================================== 2428 autoparal False to skip the autoparal step (default True) 2429 exec_args List of arguments passed to executable. 2430 ============== ============================================================== 2431 2432 Returns: 2433 1 if task was started, 0 otherwise. 2434 """ 2435 if self.status >= self.S_SUB: 2436 raise self.Error("Task status: %s" % str(self.status)) 2437 2438 if self.start_lockfile.exists: 2439 self.history.warning("Found lock file: %s" % self.start_lockfile.path) 2440 return 0 2441 2442 self.start_lockfile.write("Started on %s" % time.asctime()) 2443 2444 self.build() 2445 self._setup() 2446 2447 # Add the variables needed to connect the node. 2448 for d in self.deps: 2449 cvars = d.connecting_vars() 2450 self.history.info("Adding connecting vars %s" % cvars) 2451 self.set_vars(cvars) 2452 2453 # Get (python) data from other nodes 2454 d.apply_getters(self) 2455 2456 # Automatic parallelization 2457 if kwargs.pop("autoparal", True) and hasattr(self, "autoparal_run"): 2458 try: 2459 self.autoparal_run() 2460 #except QueueAdapterError as exc: 2461 # # If autoparal cannot find a qadapter to run the calculation raises an Exception 2462 # self.history.critical(exc) 2463 # msg = "Error while trying to run autoparal in task:%s\n%s" % (repr(task), straceback()) 2464 # cprint(msg, "yellow") 2465 # self.set_status(self.S_QCRITICAL, msg=msg) 2466 # return 0 2467 except Exception as exc: 2468 # Sometimes autoparal_run fails because Abinit aborts 2469 # at the level of the parser e.g. cannot find the spacegroup 2470 # due to some numerical noise in the structure. 2471 # In this case we call fix_abicritical and then we try to run autoparal again. 2472 self.history.critical("First call to autoparal failed with `%s`. Will try fix_abicritical" % exc) 2473 msg = "autoparal_fake_run raised:\n%s" % straceback() 2474 self.history.critical(msg) 2475 2476 fixed = self.fix_abicritical() 2477 if not fixed: 2478 self.set_status(self.S_ABICRITICAL, msg="fix_abicritical could not solve the problem") 2479 return 0 2480 2481 try: 2482 self.autoparal_run() 2483 self.history.info("Second call to autoparal succeeded!") 2484 #cprint("Second call to autoparal succeeded!", "green") 2485 2486 except Exception as exc: 2487 self.history.critical("Second call to autoparal failed with %s. Cannot recover!", exc) 2488 msg = "Tried autoparal again but got:\n%s" % straceback() 2489 cprint(msg, "red") 2490 self.set_status(self.S_ABICRITICAL, msg=msg) 2491 return 0 2492 2493 # Start the calculation in a subprocess and return. 2494 self._process = self.manager.launch(self, **kwargs) 2495 return 1 2496 2497 def start_and_wait(self, *args, **kwargs): 2498 """ 2499 Helper method to start the task and wait for completion. 2500 2501 Mainly used when we are submitting the task via the shell without passing through a queue manager. 2502 """ 2503 self.start(*args, **kwargs) 2504 retcode = self.wait() 2505 return retcode 2506 2507 def get_graphviz(self, engine="automatic", graph_attr=None, node_attr=None, edge_attr=None): 2508 """ 2509 Generate task graph in the DOT language (only parents and children of this task). 2510 2511 Args: 2512 engine: ['dot', 'neato', 'twopi', 'circo', 'fdp', 'sfdp', 'patchwork', 'osage'] 2513 graph_attr: Mapping of (attribute, value) pairs for the graph. 2514 node_attr: Mapping of (attribute, value) pairs set for all nodes. 2515 edge_attr: Mapping of (attribute, value) pairs set for all edges. 2516 2517 Returns: graphviz.Digraph <https://graphviz.readthedocs.io/en/stable/api.html#digraph> 2518 """ 2519 # https://www.graphviz.org/doc/info/ 2520 from graphviz import Digraph 2521 fg = Digraph("task", # filename="task_%s.gv" % os.path.basename(self.workdir), 2522 engine="dot" if engine == "automatic" else engine) 2523 2524 # Set graph attributes. 2525 #fg.attr(label="%s@%s" % (self.__class__.__name__, self.relworkdir)) 2526 fg.attr(label=repr(self)) 2527 #fg.attr(fontcolor="white", bgcolor='purple:pink') 2528 #fg.attr(rankdir="LR", pagedir="BL") 2529 #fg.attr(constraint="false", pack="true", packMode="clust") 2530 fg.node_attr.update(color='lightblue2', style='filled') 2531 2532 # Add input attributes. 2533 if graph_attr is not None: 2534 fg.graph_attr.update(**graph_attr) 2535 if node_attr is not None: 2536 fg.node_attr.update(**node_attr) 2537 if edge_attr is not None: 2538 fg.edge_attr.update(**edge_attr) 2539 2540 def node_kwargs(node): 2541 return dict( 2542 #shape="circle", 2543 color=node.color_hex, 2544 label=(str(node) if not hasattr(node, "pos_str") else 2545 node.pos_str + "\n" + node.__class__.__name__), 2546 ) 2547 2548 edge_kwargs = dict(arrowType="vee", style="solid") 2549 cluster_kwargs = dict(rankdir="LR", pagedir="BL", style="rounded", bgcolor="azure2") 2550 2551 # Build cluster with tasks. 2552 cluster_name = "cluster%s" % self.work.name 2553 with fg.subgraph(name=cluster_name) as wg: 2554 wg.attr(**cluster_kwargs) 2555 wg.attr(label="%s (%s)" % (self.__class__.__name__, self.name)) 2556 wg.node(self.name, **node_kwargs(self)) 2557 2558 # Connect task to children. 2559 for child in self.get_children(): 2560 # Test if child is in the same work. 2561 myg = wg if child in self.work else fg 2562 myg.node(child.name, **node_kwargs(child)) 2563 # Find file extensions required by this task 2564 i = [dep.node for dep in child.deps].index(self) 2565 edge_label = "+".join(child.deps[i].exts) 2566 myg.edge(self.name, child.name, label=edge_label, color=self.color_hex, 2567 **edge_kwargs) 2568 2569 # Connect task to parents 2570 for parent in self.get_parents(): 2571 # Test if parent is in the same work. 2572 myg = wg if parent in self.work else fg 2573 myg.node(parent.name, **node_kwargs(parent)) 2574 # Find file extensions required by self (task) 2575 i = [dep.node for dep in self.deps].index(parent) 2576 edge_label = "+".join(self.deps[i].exts) 2577 myg.edge(parent.name, self.name, label=edge_label, color=parent.color_hex, 2578 **edge_kwargs) 2579 2580 # Treat the case in which we have a work producing output for other tasks. 2581 #for work in self: 2582 # children = work.get_children() 2583 # if not children: continue 2584 # cluster_name = "cluster%s" % work.name 2585 # seen = set() 2586 # for child in children: 2587 # # This is not needed, too much confusing 2588 # #fg.edge(cluster_name, child.name, color=work.color_hex, **edge_kwargs) 2589 # # Find file extensions required by work 2590 # i = [dep.node for dep in child.deps].index(work) 2591 # for ext in child.deps[i].exts: 2592 # out = "%s (%s)" % (ext, work.name) 2593 # fg.node(out) 2594 # fg.edge(out, child.name, **edge_kwargs) 2595 # key = (cluster_name, out) 2596 # if key not in seen: 2597 # fg.edge(cluster_name, out, color=work.color_hex, **edge_kwargs) 2598 # seen.add(key) 2599 2600 return fg 2601 2602 2603class DecreaseDemandsError(Exception): 2604 """ 2605 exception to be raised by a task if the request to decrease some demand, load or memory, could not be performed 2606 """ 2607 2608 2609class AbinitTask(Task): 2610 """ 2611 Base class defining an ABINIT calculation 2612 """ 2613 Results = TaskResults 2614 2615 @classmethod 2616 def from_input(cls, input, workdir=None, manager=None): 2617 """ 2618 Create an instance of `AbinitTask` from an ABINIT input. 2619 2620 Args: 2621 ainput: |AbinitInput| object. 2622 workdir: Path to the working directory. 2623 manager: |TaskManager| object. 2624 """ 2625 return cls(input, workdir=workdir, manager=manager) 2626 2627 @classmethod 2628 def temp_shell_task(cls, inp, mpi_procs=1, workdir=None, manager=None): 2629 """ 2630 Build a Task with a temporary workdir. The task is executed via the shell with 1 MPI proc. 2631 Mainly used for invoking Abinit to get important parameters needed to prepare the real task. 2632 2633 Args: 2634 mpi_procs: Number of MPI processes to use. 2635 """ 2636 # Build a simple manager to run the job in a shell subprocess 2637 # Allow users to specify the temporary directory via env variable. 2638 dir = os.getenv("ABIPY_TMPDIR", default=None) 2639 import tempfile 2640 workdir = tempfile.mkdtemp(dir=dir) if workdir is None else workdir 2641 if manager is None: manager = TaskManager.from_user_config() 2642 2643 # Construct the task and run it 2644 task = cls.from_input(inp, workdir=workdir, manager=manager.to_shell_manager(mpi_procs=mpi_procs)) 2645 task.set_name('temp_shell_task') 2646 return task 2647 2648 def setup(self): 2649 """ 2650 Abinit has the very *bad* habit of changing the file extension by appending the characters in [A,B ..., Z] 2651 to the output file, and this breaks a lot of code that relies of the use of a unique file extension. 2652 Here we fix this issue by renaming run.abo to run.abo_[number] if the output file "run.abo" already 2653 exists. A few lines of code in python, a lot of problems if you try to implement this trick in Fortran90. 2654 """ 2655 def rename_file(afile): 2656 """Helper function to rename :class:`File` objects. Return string for logging purpose.""" 2657 # Find the index of the last file (if any). 2658 # TODO: Maybe it's better to use run.abo --> run(1).abo 2659 fnames = [f for f in os.listdir(self.workdir) if f.startswith(afile.basename)] 2660 nums = [int(f) for f in [f.split("_")[-1] for f in fnames] if f.isdigit()] 2661 last = max(nums) if nums else 0 2662 new_path = afile.path + "_" + str(last+1) 2663 2664 os.rename(afile.path, new_path) 2665 return "Will rename %s to %s" % (afile.path, new_path) 2666 2667 logs = [] 2668 if self.output_file.exists: logs.append(rename_file(self.output_file)) 2669 if self.log_file.exists: logs.append(rename_file(self.log_file)) 2670 2671 if logs: 2672 self.history.info("\n".join(logs)) 2673 2674 @property 2675 def executable(self): 2676 """Path to the executable required for running the Task.""" 2677 try: 2678 return self._executable 2679 except AttributeError: 2680 return "abinit" 2681 2682 @property 2683 def pseudos(self): 2684 """List of pseudos used in the calculation.""" 2685 return self.input.pseudos 2686 2687 @property 2688 def isnc(self): 2689 """True if norm-conserving calculation.""" 2690 return self.input.isnc 2691 2692 @property 2693 def ispaw(self): 2694 """True if PAW calculation""" 2695 return self.input.ispaw 2696 2697 @property 2698 def is_gs_task(self): 2699 """True if task is GsTask subclass.""" 2700 return isinstance(self, GsTask) 2701 2702 @property 2703 def is_dfpt_task(self): 2704 """True if task is a DftpTask subclass.""" 2705 return isinstance(self, DfptTask) 2706 2707 @lazy_property 2708 def cycle_class(self): 2709 """ 2710 Return the subclass of ScfCycle associated to the task or 2711 None if no SCF algorithm if associated to the task. 2712 """ 2713 if isinstance(self, RelaxTask): 2714 return abiinspect.Relaxation 2715 elif isinstance(self, GsTask): 2716 return abiinspect.GroundStateScfCycle 2717 elif self.is_dfpt_task: 2718 return abiinspect.D2DEScfCycle 2719 2720 return None 2721 2722 @property 2723 def filesfile_string(self): 2724 """String with the list of files and prefixes needed to execute ABINIT.""" 2725 lines = [] 2726 app = lines.append 2727 pj = os.path.join 2728 2729 app(self.input_file.path) # Path to the input file 2730 app(self.output_file.path) # Path to the output file 2731 app(pj(self.workdir, self.prefix.idata)) # Prefix for input data 2732 app(pj(self.workdir, self.prefix.odata)) # Prefix for output data 2733 app(pj(self.workdir, self.prefix.tdata)) # Prefix for temporary data 2734 2735 # Paths to the pseudopotential files. 2736 # Note that here the pseudos **must** be sorted according to znucl. 2737 # Here we reorder the pseudos if the order is wrong. 2738 ord_pseudos = [] 2739 2740 znucl = [specie.number for specie in self.input.structure.species_by_znucl] 2741 2742 for z in znucl: 2743 for p in self.pseudos: 2744 if p.Z == z: 2745 ord_pseudos.append(p) 2746 break 2747 else: 2748 raise ValueError("Cannot find pseudo with znucl %s in pseudos:\n%s" % (z, self.pseudos)) 2749 2750 for pseudo in ord_pseudos: 2751 app(pseudo.path) 2752 2753 return "\n".join(lines) 2754 2755 def set_pconfs(self, pconfs): 2756 """Set the list of autoparal configurations.""" 2757 self._pconfs = pconfs 2758 2759 @property 2760 def pconfs(self): 2761 """List of autoparal configurations.""" 2762 try: 2763 return self._pconfs 2764 except AttributeError: 2765 return None 2766 2767 def uses_paral_kgb(self, value=1): 2768 """True if the task is a GS Task and uses paral_kgb with the given value.""" 2769 paral_kgb = self.get_inpvar("paral_kgb", 0) 2770 # paral_kgb is used only in the GS part. 2771 return paral_kgb == value and isinstance(self, GsTask) 2772 2773 def _change_structure(self, new_structure): 2774 """Change the input structure.""" 2775 # Compare new and old structure for logging purpose. 2776 # TODO: Write method of structure to compare self and other and return a dictionary 2777 old_structure = self.input.structure 2778 old_lattice = old_structure.lattice 2779 2780 abc_diff = np.array(new_structure.lattice.abc) - np.array(old_lattice.abc) 2781 angles_diff = np.array(new_structure.lattice.angles) - np.array(old_lattice.angles) 2782 cart_diff = new_structure.cart_coords - old_structure.cart_coords 2783 displs = np.array([np.sqrt(np.dot(v, v)) for v in cart_diff]) 2784 2785 recs, tol_angle, tol_length = [], 10**-2, 10**-5 2786 2787 if np.any(np.abs(angles_diff) > tol_angle): 2788 recs.append("new_agles - old_angles = %s" % angles_diff) 2789 2790 if np.any(np.abs(abc_diff) > tol_length): 2791 recs.append("new_abc - old_abc = %s" % abc_diff) 2792 2793 if np.any(np.abs(displs) > tol_length): 2794 min_pos, max_pos = displs.argmin(), displs.argmax() 2795 recs.append("Mean displ: %.2E, Max_displ: %.2E (site %d), min_displ: %.2E (site %d)" % 2796 (displs.mean(), displs[max_pos], max_pos, displs[min_pos], min_pos)) 2797 2798 self.history.info("Changing structure (only significant diffs are shown):") 2799 if not recs: 2800 self.history.info("Input and output structure seems to be equal within the given tolerances") 2801 else: 2802 for rec in recs: 2803 self.history.info(rec) 2804 2805 self.input.set_structure(new_structure) 2806 #assert self.input.structure == new_structure 2807 2808 def autoparal_run(self): 2809 """ 2810 Find an optimal set of parameters for the execution of the task 2811 This method can change the ABINIT input variables and/or the 2812 submission parameters e.g. the number of CPUs for MPI and OpenMp. 2813 2814 Set: 2815 self.pconfs where pconfs is a :class:`ParalHints` object with the configuration reported by 2816 autoparal and optimal is the optimal configuration selected. 2817 Returns 0 if success 2818 """ 2819 policy = self.manager.policy 2820 2821 if policy.autoparal == 0: # or policy.max_ncpus in [None, 1]: 2822 self.history.info("Nothing to do in autoparal, returning (None, None)") 2823 return 0 2824 2825 if policy.autoparal != 1: 2826 raise NotImplementedError("autoparal != 1") 2827 2828 ############################################################################ 2829 # Run ABINIT in sequential to get the possible configurations with max_ncpus 2830 ############################################################################ 2831 2832 # Set the variables for automatic parallelization 2833 # Will get all the possible configurations up to max_ncpus 2834 # Return immediately if max_ncpus == 1 2835 max_ncpus = self.manager.max_cores 2836 if max_ncpus == 1: return 0 2837 2838 autoparal_vars = dict(autoparal=policy.autoparal, max_ncpus=max_ncpus, mem_test=0) 2839 self.set_vars(autoparal_vars) 2840 2841 # Run the job in a shell subprocess with mpi_procs = 1 2842 # we don't want to make a request to the queue manager for this simple job! 2843 # Return code is always != 0 2844 process = self.manager.to_shell_manager(mpi_procs=1).launch(self) 2845 self.history.pop() 2846 retcode = process.wait() 2847 # To avoid: ResourceWarning: unclosed file <_io.BufferedReader name=87> in py3k 2848 process.stderr.close() 2849 #process.stdout.close() 2850 2851 # Remove the variables added for the automatic parallelization 2852 self.input.remove_vars(list(autoparal_vars.keys())) 2853 2854 ############################################################## 2855 # Parse the autoparal configurations from the main output file 2856 ############################################################## 2857 parser = ParalHintsParser() 2858 try: 2859 pconfs = parser.parse(self.output_file.path) 2860 except parser.Error: 2861 # In principle Abinit should have written a complete log file 2862 # because we called .wait() but sometimes the Yaml doc is incomplete and 2863 # the parser raises. Let's wait 5 secs and then try again. 2864 time.sleep(5) 2865 try: 2866 pconfs = parser.parse(self.output_file.path) 2867 except parser.Error: 2868 self.history.critical("Error while parsing Autoparal section:\n%s" % straceback()) 2869 return 2 2870 2871 if "paral_kgb" not in self.input: 2872 self.input.set_vars(paral_kgb=pconfs.info.get("paral_kgb", 0)) 2873 2874 ###################################################### 2875 # Select the optimal configuration according to policy 2876 ###################################################### 2877 optconf = self.find_optconf(pconfs) 2878 2879 #################################################### 2880 # Change the input file and/or the submission script 2881 #################################################### 2882 self.set_vars(optconf.vars) 2883 2884 # Write autoparal configurations to JSON file. 2885 d = pconfs.as_dict() 2886 d["optimal_conf"] = optconf 2887 json_pretty_dump(d, os.path.join(self.workdir, "autoparal.json")) 2888 2889 ############## 2890 # Finalization 2891 ############## 2892 # Reset the status, remove garbage files ... 2893 self.set_status(self.S_INIT, msg='finished autoparal run') 2894 2895 # Remove the output file since Abinit likes to create new files 2896 # with extension .outA, .outB if the file already exists. 2897 os.remove(self.output_file.path) 2898 os.remove(self.log_file.path) 2899 os.remove(self.stderr_file.path) 2900 2901 return 0 2902 2903 def find_optconf(self, pconfs): 2904 """Find the optimal Parallel configuration.""" 2905 # Save pconfs for future reference. 2906 self.set_pconfs(pconfs) 2907 2908 # Select the partition on which we'll be running and set MPI/OMP cores. 2909 optconf = self.manager.select_qadapter(pconfs) 2910 return optconf 2911 2912 def select_files(self, what="o"): 2913 """ 2914 Helper function used to select the files of a task. 2915 2916 Args: 2917 what: string with the list of characters selecting the file type 2918 Possible choices: 2919 i ==> input_file, 2920 o ==> output_file, 2921 f ==> files_file, 2922 j ==> job_file, 2923 l ==> log_file, 2924 e ==> stderr_file, 2925 q ==> qout_file, 2926 all ==> all files. 2927 """ 2928 choices = collections.OrderedDict([ 2929 ("i", self.input_file), 2930 ("o", self.output_file), 2931 ("f", self.files_file), 2932 ("j", self.job_file), 2933 ("l", self.log_file), 2934 ("e", self.stderr_file), 2935 ("q", self.qout_file), 2936 ]) 2937 2938 if what == "all": 2939 return [getattr(v, "path") for v in choices.values()] 2940 2941 selected = [] 2942 for c in what: 2943 try: 2944 selected.append(getattr(choices[c], "path")) 2945 except KeyError: 2946 self.history.warning("Wrong keyword %s" % c) 2947 2948 return selected 2949 2950 def restart(self): 2951 """ 2952 general restart used when scheduler problems have been taken care of 2953 """ 2954 return self._restart() 2955 2956 #@check_spectator 2957 def reset_from_scratch(self): 2958 """ 2959 Restart from scratch, this is to be used if a job is restarted with more resources after a crash 2960 2961 Move output files produced in workdir to _reset otherwise check_status continues 2962 to see the task as crashed even if the job did not run 2963 """ 2964 # Create reset directory if not already done. 2965 reset_dir = os.path.join(self.workdir, "_reset") 2966 reset_file = os.path.join(reset_dir, "_counter") 2967 if not os.path.exists(reset_dir): 2968 os.mkdir(reset_dir) 2969 num_reset = 1 2970 else: 2971 with open(reset_file, "rt") as fh: 2972 num_reset = 1 + int(fh.read()) 2973 2974 # Move files to reset and append digit with reset index. 2975 def move_file(f): 2976 if not f.exists: return 2977 try: 2978 f.move(os.path.join(reset_dir, f.basename + "_" + str(num_reset))) 2979 except OSError as exc: 2980 self.history.warning("Couldn't move file {}. exc: {}".format(f, str(exc))) 2981 2982 for fname in ("output_file", "log_file", "stderr_file", "qout_file", "qerr_file"): 2983 move_file(getattr(self, fname)) 2984 2985 with open(reset_file, "wt") as fh: 2986 fh.write(str(num_reset)) 2987 2988 self.start_lockfile.remove() 2989 2990 # Reset datetimes 2991 self.datetimes.reset() 2992 2993 return self._restart(submit=False) 2994 2995 #@check_spectator 2996 def fix_abicritical(self): 2997 """ 2998 method to fix crashes/error caused by abinit 2999 3000 Returns: 3001 1 if task has been fixed else 0. 3002 """ 3003 event_handlers = self.event_handlers 3004 if not event_handlers: 3005 self.set_status(status=self.S_ERROR, msg='Empty list of event handlers. Cannot fix abi_critical errors') 3006 return 0 3007 3008 count, done = 0, len(event_handlers) * [0] 3009 3010 report = self.get_event_report() 3011 if report is None: 3012 self.set_status(status=self.S_ERROR, msg='get_event_report returned None') 3013 return 0 3014 3015 # Note we have loop over all possible events (slow, I know) 3016 # because we can have handlers for Error, Bug or Warning 3017 # (ideally only for CriticalWarnings but this is not done yet) 3018 for event in report: 3019 for i, handler in enumerate(self.event_handlers): 3020 3021 if handler.can_handle(event) and not done[i]: 3022 self.history.info("handler %s will try to fix event %s" % (handler, event)) 3023 try: 3024 d = handler.handle_task_event(self, event) 3025 if d: 3026 done[i] += 1 3027 count += 1 3028 3029 except Exception as exc: 3030 self.history.critical(str(exc)) 3031 3032 if count: 3033 self.reset_from_scratch() 3034 return 1 3035 3036 self.set_status(status=self.S_ERROR, msg='We encountered AbiCritical events that could not be fixed') 3037 return 0 3038 3039 #@check_spectator 3040 def fix_queue_critical(self): 3041 """ 3042 This function tries to fix critical events originating from the queue submission system. 3043 3044 General strategy, first try to increase resources in order to fix the problem, 3045 if this is not possible, call a task specific method to attempt to decrease the demands. 3046 3047 Returns: 3048 1 if task has been fixed else 0. 3049 """ 3050 from abipy.flowtk.scheduler_error_parsers import NodeFailureError, MemoryCancelError, TimeCancelError 3051 #assert isinstance(self.manager, TaskManager) 3052 3053 self.history.info('fixing queue critical') 3054 ret = "task.fix_queue_critical: " 3055 3056 if not self.queue_errors: 3057 # TODO 3058 # paral_kgb = 1 leads to nasty sigegv that are seen as Qcritical errors! 3059 # Try to fallback to the conjugate gradient. 3060 #if self.uses_paral_kgb(1): 3061 # self.history.critical("QCRITICAL with PARAL_KGB==1. Will try CG!") 3062 # self.set_vars(paral_kgb=0) 3063 # self.reset_from_scratch() 3064 # return 3065 # queue error but no errors detected, try to solve by increasing ncpus if the task scales 3066 # if resources are at maximum the task is definitively turned to errored 3067 if self.mem_scales or self.load_scales: 3068 try: 3069 self.manager.increase_resources() # acts either on the policy or on the qadapter 3070 self.reset_from_scratch() 3071 ret += "increased resources" 3072 return ret 3073 except ManagerIncreaseError: 3074 self.set_status(self.S_ERROR, msg='unknown queue error, could not increase resources any further') 3075 raise FixQueueCriticalError 3076 else: 3077 self.set_status(self.S_ERROR, msg='unknown queue error, no options left') 3078 raise FixQueueCriticalError 3079 3080 else: 3081 print("Fix_qcritical: received %d queue_errors" % len(self.queue_errors)) 3082 print("type_list: %s" % list(type(qe) for qe in self.queue_errors)) 3083 3084 for error in self.queue_errors: 3085 self.history.info('fixing: %s' % str(error)) 3086 ret += str(error) 3087 if isinstance(error, NodeFailureError): 3088 # if the problematic node is known, exclude it 3089 if error.nodes is not None: 3090 try: 3091 self.manager.exclude_nodes(error.nodes) 3092 self.reset_from_scratch() 3093 self.set_status(self.S_READY, msg='excluding nodes') 3094 except Exception: 3095 raise FixQueueCriticalError 3096 else: 3097 self.set_status(self.S_ERROR, msg='Node error but no node identified.') 3098 raise FixQueueCriticalError 3099 3100 elif isinstance(error, MemoryCancelError): 3101 # ask the qadapter to provide more resources, i.e. more cpu's so more total memory if the code 3102 # scales this should fix the memeory problem 3103 # increase both max and min ncpu of the autoparalel and rerun autoparalel 3104 if self.mem_scales: 3105 try: 3106 self.manager.increase_ncpus() 3107 self.reset_from_scratch() 3108 self.set_status(self.S_READY, msg='increased ncps to solve memory problem') 3109 return 3110 except ManagerIncreaseError: 3111 self.history.warning('increasing ncpus failed') 3112 3113 # if the max is reached, try to increase the memory per cpu: 3114 try: 3115 self.manager.increase_mem() 3116 self.reset_from_scratch() 3117 self.set_status(self.S_READY, msg='increased mem') 3118 return 3119 except ManagerIncreaseError: 3120 self.history.warning('increasing mem failed') 3121 3122 # if this failed ask the task to provide a method to reduce the memory demand 3123 try: 3124 self.reduce_memory_demand() 3125 self.reset_from_scratch() 3126 self.set_status(self.S_READY, msg='decreased mem demand') 3127 return 3128 except DecreaseDemandsError: 3129 self.history.warning('decreasing demands failed') 3130 3131 msg = ('Memory error detected but the memory could not be increased neither could the\n' 3132 'memory demand be decreased. Unrecoverable error.') 3133 self.set_status(self.S_ERROR, msg) 3134 raise FixQueueCriticalError 3135 3136 elif isinstance(error, TimeCancelError): 3137 # ask the qadapter to provide more time 3138 print('trying to increase time') 3139 try: 3140 self.manager.increase_time() 3141 self.reset_from_scratch() 3142 self.set_status(self.S_READY, msg='increased wall time') 3143 return 3144 except ManagerIncreaseError: 3145 self.history.warning('increasing the waltime failed') 3146 3147 # if this fails ask the qadapter to increase the number of cpus 3148 if self.load_scales: 3149 try: 3150 self.manager.increase_ncpus() 3151 self.reset_from_scratch() 3152 self.set_status(self.S_READY, msg='increased number of cpus') 3153 return 3154 except ManagerIncreaseError: 3155 self.history.warning('increase ncpus to speed up the calculation to stay in the walltime failed') 3156 3157 # if this failed ask the task to provide a method to speed up the task 3158 try: 3159 self.speed_up() 3160 self.reset_from_scratch() 3161 self.set_status(self.S_READY, msg='task speedup') 3162 return 3163 except DecreaseDemandsError: 3164 self.history.warning('decreasing demands failed') 3165 3166 msg = ('Time cancel error detected but the time could not be increased neither could\n' 3167 'the time demand be decreased by speedup of increasing the number of cpus.\n' 3168 'Unrecoverable error.') 3169 self.set_status(self.S_ERROR, msg) 3170 3171 else: 3172 msg = 'No solution provided for error %s. Unrecoverable error.' % error.name 3173 self.set_status(self.S_ERROR, msg) 3174 3175 return 0 3176 3177 def parse_timing(self): 3178 """ 3179 Parse the timer data in the main output file of Abinit. 3180 Requires timopt /= 0 in the input file (usually timopt = -1) 3181 3182 Return: :class:`AbinitTimerParser` instance, None if error. 3183 """ 3184 3185 parser = AbinitTimerParser() 3186 read_ok = parser.parse(self.output_file.path) 3187 if read_ok: 3188 return parser 3189 return None 3190 3191 3192class ProduceHist(object): 3193 """ 3194 Mixin class for a |Task| producing a HIST file. 3195 Provide the method `open_hist` that reads and return a HIST file. 3196 """ 3197 @property 3198 def hist_path(self): 3199 """Absolute path of the HIST file. Empty string if file is not present.""" 3200 # Lazy property to avoid multiple calls to has_abiext. 3201 try: 3202 return self._hist_path 3203 except AttributeError: 3204 path = self.outdir.has_abiext("HIST") 3205 if path: self._hist_path = path 3206 return path 3207 3208 def open_hist(self): 3209 """ 3210 Open the HIST file located in the in self.outdir. 3211 Returns |HistFile| object, None if file could not be found or file is not readable. 3212 """ 3213 if not self.hist_path: 3214 if self.status == self.S_OK: 3215 self.history.critical("%s reached S_OK but didn't produce a HIST file in %s" % (self, self.outdir)) 3216 return None 3217 3218 # Open the HIST file 3219 from abipy.dynamics.hist import HistFile 3220 try: 3221 return HistFile(self.hist_path) 3222 except Exception as exc: 3223 self.history.critical("Exception while reading HIST file at %s:\n%s" % (self.hist_path, str(exc))) 3224 return None 3225 3226 3227class GsTask(AbinitTask): 3228 """ 3229 Base class for ground-state tasks. A ground state task produces a GSR file 3230 Provides the method `open_gsr` that reads and returns a GSR file. 3231 """ 3232 @property 3233 def gsr_path(self): 3234 """Absolute path of the GSR file. Empty string if file is not present.""" 3235 # Lazy property to avoid multiple calls to has_abiext. 3236 try: 3237 return self._gsr_path 3238 except AttributeError: 3239 path = self.outdir.has_abiext("GSR") 3240 if path: self._gsr_path = path 3241 return path 3242 3243 def open_gsr(self): 3244 """ 3245 Open the GSR file located in the in self.outdir. 3246 Returns |GsrFile| object, None if file could not be found or file is not readable. 3247 """ 3248 gsr_path = self.gsr_path 3249 if not gsr_path: 3250 if self.status == self.S_OK: 3251 self.history.critical("%s reached S_OK but didn't produce a GSR file in %s" % (self, self.outdir)) 3252 return None 3253 3254 # Open the GSR file. 3255 from abipy.electrons.gsr import GsrFile 3256 try: 3257 return GsrFile(gsr_path) 3258 except Exception as exc: 3259 self.history.critical("Exception while reading GSR file at %s:\n%s" % (gsr_path, str(exc))) 3260 return None 3261 3262 3263class ScfTask(GsTask): 3264 """ 3265 Self-consistent ground-state calculations. 3266 Provide support for in-place restart via (WFK|DEN) files 3267 """ 3268 CRITICAL_EVENTS = [ 3269 events.ScfConvergenceWarning, 3270 ] 3271 3272 color_rgb = np.array((255, 0, 0)) / 255 3273 3274 def restart(self): 3275 """SCF calculations can be restarted if we have either the WFK file or the DEN file.""" 3276 # Prefer WFK over DEN files since we can reuse the wavefunctions. 3277 for ext in ("WFK", "DEN"): 3278 restart_file = self.outdir.has_abiext(ext) 3279 irdvars = irdvars_for_ext(ext) 3280 if restart_file: break 3281 else: 3282 raise self.RestartError("%s: Cannot find WFK or DEN file to restart from." % self) 3283 3284 # Move out --> in. 3285 self.out_to_in(restart_file) 3286 3287 # Add the appropriate variable for restarting. 3288 self.set_vars(irdvars) 3289 3290 # Now we can resubmit the job. 3291 self.history.info("Will restart from %s", restart_file) 3292 return self._restart() 3293 3294 def inspect(self, **kwargs): 3295 """ 3296 Plot the SCF cycle results with matplotlib. 3297 3298 Returns: |matplotlib-Figure| or None if some error occurred. 3299 """ 3300 try: 3301 scf_cycle = abiinspect.GroundStateScfCycle.from_file(self.output_file.path) 3302 except IOError: 3303 return None 3304 3305 if scf_cycle is not None: 3306 if "title" not in kwargs: kwargs["title"] = str(self) 3307 return scf_cycle.plot(**kwargs) 3308 3309 return None 3310 3311 def get_results(self, **kwargs): 3312 results = super().get_results(**kwargs) 3313 3314 # Open the GSR file and add its data to results.out 3315 with self.open_gsr() as gsr: 3316 results["out"].update(gsr.as_dict()) 3317 # Add files to GridFS 3318 results.register_gridfs_files(GSR=gsr.filepath) 3319 3320 return results 3321 3322 3323class CollinearThenNonCollinearScfTask(ScfTask): 3324 """ 3325 A specialized ScfTaks that performs an initial SCF run with nsppol = 2. 3326 The spin polarized WFK file is then used to start a non-collinear SCF run (nspinor == 2) 3327 initialized from the previous WFK file. 3328 """ 3329 def __init__(self, input, workdir=None, manager=None, deps=None): 3330 super().__init__(input, workdir=workdir, manager=manager, deps=deps) 3331 # Enforce nspinor = 1, nsppol = 2 and prtwf = 1. 3332 self._input = self.input.deepcopy() 3333 self.input.set_spin_mode("polarized") 3334 self.input.set_vars(prtwf=1) 3335 self.collinear_done = False 3336 3337 def _on_ok(self): 3338 results = super()._on_ok() 3339 if not self.collinear_done: 3340 self.input.set_spin_mode("spinor") 3341 self.collinear_done = True 3342 self.finalized = False 3343 self.restart() 3344 3345 return results 3346 3347 3348class NscfTask(GsTask): 3349 """ 3350 Non-Self-consistent GS calculation. Provide in-place restart via WFK files 3351 """ 3352 CRITICAL_EVENTS = [ 3353 events.NscfConvergenceWarning, 3354 ] 3355 3356 color_rgb = np.array((200, 80, 100)) / 255 3357 3358 def setup(self): 3359 """ 3360 NSCF calculations should use the same FFT mesh as the one employed in the GS task 3361 (in principle, it's possible to interpolate inside Abinit but tests revealed some numerical noise 3362 Here we change the input file of the NSCF task to have the same FFT mesh. 3363 """ 3364 for dep in self.deps: 3365 if "DEN" in dep.exts: 3366 parent_task = dep.node 3367 break 3368 else: 3369 raise RuntimeError("Cannot find parent node producing DEN file") 3370 3371 with parent_task.open_gsr() as gsr: 3372 if hasattr(gsr, "reader"): 3373 den_mesh = gsr.reader.read_ngfft3() 3374 else: 3375 den_mesh = None 3376 self.history.warning("Cannot read ngfft3 from file. Likely Fortran file!") 3377 3378 if self.ispaw: 3379 self.set_vars(ngfftdg=den_mesh) 3380 else: 3381 self.set_vars(ngfft=den_mesh) 3382 3383 super().setup() 3384 3385 def restart(self): 3386 """NSCF calculations can be restarted only if we have the WFK file.""" 3387 ext = "WFK" 3388 restart_file = self.outdir.has_abiext(ext) 3389 if not restart_file: 3390 raise self.RestartError("%s: Cannot find the WFK file to restart from." % self) 3391 3392 # Move out --> in. 3393 self.out_to_in(restart_file) 3394 3395 # Add the appropriate variable for restarting. 3396 irdvars = irdvars_for_ext(ext) 3397 self.set_vars(irdvars) 3398 3399 # Now we can resubmit the job. 3400 self.history.info("Will restart from %s", restart_file) 3401 return self._restart() 3402 3403 def get_results(self, **kwargs): 3404 results = super().get_results(**kwargs) 3405 3406 # Read the GSR file. 3407 with self.open_gsr() as gsr: 3408 results["out"].update(gsr.as_dict()) 3409 # Add files to GridFS 3410 results.register_gridfs_files(GSR=gsr.filepath) 3411 3412 return results 3413 3414 3415class RelaxTask(GsTask, ProduceHist): 3416 """ 3417 Task for structural optimizations. 3418 """ 3419 # TODO possible ScfConvergenceWarning? 3420 CRITICAL_EVENTS = [ 3421 events.RelaxConvergenceWarning, 3422 ] 3423 3424 color_rgb = np.array((255, 61, 255)) / 255 3425 3426 def get_final_structure(self): 3427 """Read the final structure from the GSR file.""" 3428 try: 3429 with self.open_gsr() as gsr: 3430 return gsr.structure 3431 except AttributeError: 3432 raise RuntimeError("Cannot find the GSR file with the final structure to restart from.") 3433 3434 def restart(self): 3435 """ 3436 Restart the structural relaxation. 3437 3438 Structure relaxations can be restarted only if we have the WFK file or the DEN or the GSR file 3439 from which we can read the last structure (mandatory) and the wavefunctions (not mandatory but useful). 3440 Prefer WFK over other files since we can reuse the wavefunctions. 3441 3442 .. note:: 3443 3444 The problem in the present approach is that some parameters in the input 3445 are computed from the initial structure and may not be consistent with 3446 the modification of the structure done during the structure relaxation. 3447 """ 3448 restart_file = None 3449 3450 # Try to restart from the WFK file if possible. 3451 # FIXME: This part has been disabled because WFK=IO is a mess if paral_kgb == 1 3452 # This is also the reason why I wrote my own MPI-IO code for the GW part! 3453 wfk_file = self.outdir.has_abiext("WFK") 3454 if False and wfk_file: 3455 irdvars = irdvars_for_ext("WFK") 3456 restart_file = self.out_to_in(wfk_file) 3457 3458 # Fallback to DEN file. Note that here we look for out_DEN instead of out_TIM?_DEN 3459 # This happens when the previous run completed and task.on_done has been performed. 3460 # ******************************************************************************** 3461 # Note that it's possible to have an undetected error if we have multiple restarts 3462 # and the last relax died badly. In this case indeed out_DEN is the file produced 3463 # by the last run that has executed on_done. 3464 # ******************************************************************************** 3465 if restart_file is None: 3466 for ext in ("", ".nc"): 3467 out_den = self.outdir.path_in("out_DEN" + ext) 3468 if os.path.exists(out_den): 3469 irdvars = irdvars_for_ext("DEN") 3470 restart_file = self.out_to_in(out_den) 3471 break 3472 3473 if restart_file is None: 3474 # Try to restart from the last TIM?_DEN file. 3475 # This should happen if the previous run didn't complete in clean way. 3476 # Find the last TIM?_DEN file. 3477 last_timden = self.outdir.find_last_timden_file() 3478 if last_timden is not None: 3479 if last_timden.path.endswith(".nc"): 3480 ofile = self.outdir.path_in("out_DEN.nc") 3481 else: 3482 ofile = self.outdir.path_in("out_DEN") 3483 os.rename(last_timden.path, ofile) 3484 restart_file = self.out_to_in(ofile) 3485 irdvars = irdvars_for_ext("DEN") 3486 3487 if restart_file is None: 3488 # Don't raise RestartError as we can still change the structure. 3489 self.history.warning("Cannot find the WFK|DEN|TIM?_DEN file to restart from.") 3490 else: 3491 # Add the appropriate variable for restarting. 3492 self.set_vars(irdvars) 3493 self.history.info("Will restart from %s", restart_file) 3494 3495 # FIXME Here we should read the HIST file but restartxf if broken! 3496 #self.set_vars({"restartxf": -1}) 3497 3498 # Read the relaxed structure from the GSR file and change the input. 3499 self._change_structure(self.get_final_structure()) 3500 3501 # Now we can resubmit the job. 3502 return self._restart() 3503 3504 def inspect(self, **kwargs): 3505 """ 3506 Plot the evolution of the structural relaxation with matplotlib. 3507 3508 Args: 3509 what: Either "hist" or "scf". The first option (default) extracts data 3510 from the HIST file and plot the evolution of the structural 3511 parameters, forces, pressures and energies. 3512 The second option, extracts data from the main output file and 3513 plot the evolution of the SCF cycles (etotal, residuals, etc). 3514 3515 Returns: |matplotlib-Figure| or None if some error occurred. 3516 """ 3517 what = kwargs.pop("what", "hist") 3518 3519 if what == "hist": 3520 # Read the hist file to get access to the structure. 3521 with self.open_hist() as hist: 3522 return hist.plot(**kwargs) if hist else None 3523 3524 elif what == "scf": 3525 # Get info on the different SCF cycles 3526 relaxation = abiinspect.Relaxation.from_file(self.output_file.path) 3527 if "title" not in kwargs: kwargs["title"] = str(self) 3528 return relaxation.plot(**kwargs) if relaxation is not None else None 3529 3530 else: 3531 raise ValueError("Wrong value for what %s" % what) 3532 3533 def get_results(self, **kwargs): 3534 results = super().get_results(**kwargs) 3535 3536 # Open the GSR file and add its data to results.out 3537 with self.open_gsr() as gsr: 3538 results["out"].update(gsr.as_dict()) 3539 # Add files to GridFS 3540 results.register_gridfs_files(GSR=gsr.filepath) 3541 3542 return results 3543 3544 def reduce_dilatmx(self, target=1.01): 3545 actual_dilatmx = self.get_inpvar('dilatmx', 1.) 3546 new_dilatmx = actual_dilatmx - min((actual_dilatmx-target), actual_dilatmx*0.05) 3547 self.set_vars(dilatmx=new_dilatmx) 3548 3549 def fix_ofiles(self): 3550 """ 3551 Note that ABINIT produces lots of out_TIM1_DEN files for each step. 3552 Here we list all TIM*_DEN files, we select the last one and we rename it in out_DEN 3553 3554 This change is needed so that we can specify dependencies with the syntax {node: "DEN"} 3555 without having to know the number of iterations needed to converge the run in node! 3556 """ 3557 super().fix_ofiles() 3558 3559 # Find the last TIM?_DEN file. 3560 last_timden = self.outdir.find_last_timden_file() 3561 if last_timden is None: 3562 self.history.warning("Cannot find TIM?_DEN files") 3563 return 3564 3565 # Rename last TIMDEN with out_DEN. 3566 ofile = self.outdir.path_in("out_DEN") 3567 if last_timden.path.endswith(".nc"): ofile += ".nc" 3568 self.history.info("Renaming last_denfile %s --> %s" % (last_timden.path, ofile)) 3569 os.rename(last_timden.path, ofile) 3570 3571 3572class DfptTask(AbinitTask): 3573 """ 3574 Base class for DFPT tasks (Phonons, DdeTask, DdkTask, ElasticTask ...) 3575 Mainly used to implement methods that are common to DFPT calculations with Abinit. 3576 Provide the method `open_ddb` that reads and return a Ddb file. 3577 3578 .. warning:: 3579 3580 This class should not be instantiated directly. 3581 """ 3582 # TODO: 3583 # for the time being we don't discern between GS and PhononCalculations. 3584 CRITICAL_EVENTS = [ 3585 events.ScfConvergenceWarning, 3586 ] 3587 3588 def __repr__(self): 3589 # Get info about DFT perturbation from input file. 3590 qpt = self.input.get("qpt", [0, 0, 0]) 3591 rfphon = self.input.get("rfphon", 0) 3592 rfatpol = self.input.get("rfatpol", [1, 1]) 3593 rfelfd = self.input.get("rfelfd", 0) 3594 rfstrs = self.input.get("rfstrs", 0) 3595 rfdir = self.input.get("rfdir", [0, 0, 0]) 3596 irdddk = self.input.get("irdddk", 0) 3597 3598 dfpt_info = "" 3599 if rfphon != 0: 3600 dfpt_info = "qpt: {}, rfphon: {}, rfatpol: {}, rfdir: {}, irdddk: {}".format( 3601 qpt, rfphon, rfatpol, rfdir, irdddk) 3602 3603 elif rfelfd != 0: 3604 dfpt_info = "qpt: {}, rfelfd: {} rfdir: {}, irdddk: {}".format( 3605 qpt, rfelfd, rfdir, irdddk) 3606 3607 elif rfstrs != 0: 3608 dfpt_info = "qpt: {}, rfstrs: {}, rfdir: {}, irdddk: {}".format( 3609 qpt, rfstrs, rfdir, irdddk) 3610 3611 try: 3612 return "<%s, node_id=%s, workdir=%s, %s>" % ( 3613 self.__class__.__name__, self.node_id, self.relworkdir, dfpt_info) 3614 except AttributeError: 3615 # this usually happens when workdir has not been initialized 3616 return "<%s, node_id=%s, workdir=None, %s>" % ( 3617 self.__class__.__name__, self.node_id, dfpt_info) 3618 3619 @property 3620 def ddb_path(self): 3621 """Absolute path of the DDB file. Empty string if file is not present.""" 3622 # Lazy property to avoid multiple calls to has_abiext. 3623 try: 3624 return self._ddb_path 3625 except AttributeError: 3626 path = self.outdir.has_abiext("DDB") 3627 if path: self._ddb_path = path 3628 return path 3629 3630 def open_ddb(self): 3631 """ 3632 Open the DDB file located in the in self.outdir. 3633 Returns a |DdbFile| object, None if file could not be found or file is not readable. 3634 """ 3635 ddb_path = self.ddb_path 3636 if not ddb_path: 3637 if self.status == self.S_OK: 3638 self.history.critical("%s reached S_OK but didn't produce a DDB file in %s" % (self, self.outdir)) 3639 return None 3640 3641 # Open the DDB file. 3642 from abipy.dfpt.ddb import DdbFile 3643 try: 3644 return DdbFile(ddb_path) 3645 except Exception as exc: 3646 self.history.critical("Exception while reading DDB file at %s:\n%s" % (ddb_path, str(exc))) 3647 return None 3648 3649 def make_links(self): 3650 """ 3651 Replace the default behaviour of make_links. More specifically, this method 3652 implements the logic required to connect DFPT calculation to `DDK` files. 3653 Remember that DDK is an extension introduced in AbiPy to deal with the 3654 irdddk input variable and the fact that the 3 files with du/dk produced by Abinit 3655 have a file extension constructed from the number of atom (e.g. 1WF[3natom +1]). 3656 3657 AbiPy uses the user-friendly syntax deps={node: "DDK"} to specify that 3658 the children will read the DDK from `node` but this also means that 3659 we have to implement extract logic to handle this case at runtime. 3660 """ 3661 for dep in self.deps: 3662 for d in dep.exts: 3663 3664 if d == "DDK": 3665 ddk_task = dep.node 3666 out_ddk = ddk_task.outdir.has_abiext("DDK") 3667 if not out_ddk: 3668 raise RuntimeError("%s didn't produce the DDK file" % ddk_task) 3669 3670 # Get (fortran) idir and costruct the name of the 1WF expected by Abinit 3671 rfdir = list(ddk_task.input["rfdir"]) 3672 if rfdir.count(1) != 1: 3673 raise RuntimeError("Only one direction should be specifned in rfdir but rfdir = %s" % rfdir) 3674 3675 idir = rfdir.index(1) + 1 3676 ddk_case = idir + 3 * len(ddk_task.input.structure) 3677 3678 infile = self.indir.path_in("in_1WF%d" % ddk_case) 3679 if out_ddk.endswith(".nc"): infile = infile + ".nc" 3680 os.symlink(out_ddk, infile) 3681 3682 elif d in ("WFK", "WFQ"): 3683 gs_task = dep.node 3684 out_wfk = gs_task.outdir.has_abiext(d) 3685 if not out_wfk: 3686 raise RuntimeError("%s didn't produce the %s file" % (gs_task, d)) 3687 3688 if d == "WFK": 3689 bname = "in_WFK" 3690 elif d == "WFQ": 3691 bname = "in_WFQ" 3692 else: 3693 raise ValueError("Don't know how to handle `%s`" % d) 3694 3695 # Ensure link has .nc extension if iomode 3 3696 if out_wfk.endswith(".nc"): bname = bname + ".nc" 3697 #print(d, out_wfk, "bname", bname) 3698 if not os.path.exists(self.indir.path_in(bname)): 3699 os.symlink(out_wfk, self.indir.path_in(bname)) 3700 3701 elif d == "DEN": 3702 gs_task = dep.node 3703 out_wfk = gs_task.outdir.has_abiext("DEN") 3704 if not out_wfk: 3705 raise RuntimeError("%s didn't produce the DEN file" % gs_task) 3706 infile = self.indir.path_in("in_DEN") 3707 if out_wfk.endswith(".nc"): infile = infile + ".nc" 3708 if not os.path.exists(infile): 3709 os.symlink(out_wfk, infile) 3710 3711 elif d == "1WF": 3712 gs_task = dep.node 3713 out_wfk = gs_task.outdir.has_abiext("1WF") 3714 if not out_wfk: 3715 raise RuntimeError("%s didn't produce the 1WF file" % gs_task) 3716 dest = self.indir.path_in("in_" + out_wfk.split("_")[-1]) 3717 if out_wfk.endswith(".nc"): dest = dest + ".nc" 3718 if not os.path.exists(dest): 3719 os.symlink(out_wfk, dest) 3720 3721 elif d == "1DEN": 3722 gs_task = dep.node 3723 out_wfk = gs_task.outdir.has_abiext("DEN") 3724 if not out_wfk: 3725 raise RuntimeError("%s didn't produce the 1DEN file" % gs_task) 3726 dest = self.indir.path_in("in_" + out_wfk.split("_")[-1]) 3727 if out_wfk.endswith(".nc"): dest = dest + ".nc" 3728 if not os.path.exists(dest): 3729 os.symlink(out_wfk, dest) 3730 3731 else: 3732 raise ValueError("Don't know how to handle extension: %s" % str(dep.exts)) 3733 3734 def restart(self): 3735 """ 3736 DFPT calculations can be restarted only if we have the 1WF file or the 1DEN file. 3737 from which we can read the first-order wavefunctions or the first order density. 3738 Prefer 1WF over 1DEN since we can reuse the wavefunctions. 3739 """ 3740 # Abinit adds the idir-ipert index at the end of the file and this breaks the extension 3741 # e.g. out_1WF4, out_DEN4. find_1wf_files and find_1den_files returns the list of files found 3742 restart_file, irdvars = None, None 3743 3744 # Highest priority to the 1WF file because restart is more efficient. 3745 wf_files = self.outdir.find_1wf_files() 3746 if wf_files is not None: 3747 restart_file = wf_files[0].path 3748 irdvars = irdvars_for_ext("1WF") 3749 if len(wf_files) != 1: 3750 restart_file = None 3751 self.history.critical("Found more than one 1WF file in outdir. Restart is ambiguous!") 3752 3753 if restart_file is None: 3754 den_files = self.outdir.find_1den_files() 3755 if den_files is not None: 3756 restart_file = den_files[0].path 3757 irdvars = {"ird1den": 1} 3758 if len(den_files) != 1: 3759 restart_file = None 3760 self.history.critical("Found more than one 1DEN file in outdir. Restart is ambiguous!") 3761 3762 if restart_file is None: 3763 # Raise because otherwise restart is equivalent to a run from scratch --> infinite loop! 3764 raise self.RestartError("%s: Cannot find the 1WF|1DEN file to restart from." % self) 3765 3766 # Move file. 3767 self.history.info("Will restart from %s", restart_file) 3768 restart_file = self.out_to_in(restart_file) 3769 3770 # Add the appropriate variable for restarting. 3771 self.set_vars(irdvars) 3772 3773 # Now we can resubmit the job. 3774 return self._restart() 3775 3776 3777class DdeTask(DfptTask): 3778 """Task for DDE calculations (perturbation wrt electric field).""" 3779 3780 color_rgb = np.array((61, 158, 255)) / 255 3781 3782 def get_results(self, **kwargs): 3783 results = super().get_results(**kwargs) 3784 return results.register_gridfs_file(DDB=(self.outdir.has_abiext("DDE"), "t")) 3785 3786 3787class DteTask(DfptTask): 3788 """Task for DTE calculations.""" 3789 color_rgb = np.array((204, 0, 204)) / 255 3790 3791 # @check_spectator 3792 def start(self, **kwargs): 3793 kwargs['autoparal'] = False 3794 return super().start(**kwargs) 3795 3796 def get_results(self, **kwargs): 3797 results = super().get_results(**kwargs) 3798 return results.register_gridfs_file(DDB=(self.outdir.has_abiext("DDE"), "t")) 3799 3800 3801class DdkTask(DfptTask): 3802 """Task for DDK calculations.""" 3803 color_rgb = np.array((0, 204, 204)) / 255 3804 3805 #@check_spectator 3806 def _on_ok(self): 3807 super()._on_ok() 3808 # Client code expects to find du/dk in DDK file. 3809 # Here I create a symbolic link out_1WF13 --> out_DDK 3810 # so that we can use deps={ddk_task: "DDK"} in the high-level API. 3811 # The price to pay is that we have to handle the DDK extension in make_links. 3812 # See DfptTask.make_links 3813 self.outdir.symlink_abiext('1WF', 'DDK') 3814 3815 def get_results(self, **kwargs): 3816 results = super().get_results(**kwargs) 3817 return results.register_gridfs_file(DDK=(self.outdir.has_abiext("DDK"), "t")) 3818 3819 3820class BecTask(DfptTask): 3821 """ 3822 Task for the calculation of Born effective charges. 3823 3824 bec_deps = {ddk_task: "DDK" for ddk_task in ddk_tasks} 3825 bec_deps.update({scf_task: "WFK"}) 3826 """ 3827 color_rgb = np.array((122, 122, 255)) / 255 3828 3829 3830class EffMassTask(DfptTask): 3831 """Task for effective mass calculations with DFPT.""" 3832 color_rgb = np.array((0, 122, 204)) / 255 3833 3834 3835class PhononTask(DfptTask): 3836 """ 3837 DFPT calculations for a single atomic perturbation. 3838 Provide support for in-place restart via (1WF|1DEN) files 3839 """ 3840 color_rgb = np.array((0, 150, 250)) / 255 3841 3842 def inspect(self, **kwargs): 3843 """ 3844 Plot the Phonon SCF cycle results with matplotlib. 3845 3846 Returns: |matplotlib-Figure| or None if some error occurred. 3847 """ 3848 scf_cycle = abiinspect.PhononScfCycle.from_file(self.output_file.path) 3849 if scf_cycle is not None: 3850 if "title" not in kwargs: kwargs["title"] = str(self) 3851 return scf_cycle.plot(**kwargs) 3852 3853 def get_results(self, **kwargs): 3854 results = super().get_results(**kwargs) 3855 return results.register_gridfs_files(DDB=(self.outdir.has_abiext("DDB"), "t")) 3856 3857 3858class ElasticTask(DfptTask): 3859 """ 3860 DFPT calculations for a single strain perturbation (uniaxial or shear strain). 3861 Provide support for in-place restart via (1WF|1DEN) files 3862 """ 3863 color_rgb = np.array((255, 204, 255)) / 255 3864 3865 3866class EphTask(AbinitTask): 3867 """ 3868 Class for electron-phonon calculations. 3869 """ 3870 color_rgb = np.array((255, 128, 0)) / 255 3871 3872 3873class ManyBodyTask(AbinitTask): 3874 """ 3875 Base class for Many-body tasks (Screening, Sigma, Bethe-Salpeter) 3876 Mainly used to implement methods that are common to MBPT calculations with Abinit. 3877 3878 .. warning:: 3879 3880 This class should not be instantiated directly. 3881 """ 3882 def reduce_memory_demand(self): 3883 """ 3884 Method that can be called by the scheduler to decrease the memory demand of a specific task. 3885 Returns True in case of success, False in case of Failure. 3886 """ 3887 # The first digit governs the storage of W(q), the second digit the storage of u(r) 3888 # Try to avoid the storage of u(r) first since reading W(q) from file will lead to a drammatic slowdown. 3889 prev_gwmem = int(self.get_inpvar("gwmem", default=11)) 3890 first_dig, second_dig = prev_gwmem // 10, prev_gwmem % 10 3891 3892 if second_dig == 1: 3893 self.set_vars(gwmem="%.2d" % (10 * first_dig)) 3894 return True 3895 3896 if first_dig == 1: 3897 self.set_vars(gwmem="%.2d" % 00) 3898 return True 3899 3900 # gwmem 00 d'oh! 3901 return False 3902 3903 3904class ScrTask(ManyBodyTask): 3905 """Tasks for SCREENING calculations """ 3906 3907 color_rgb = np.array((255, 128, 0)) / 255 3908 3909 @property 3910 def scr_path(self): 3911 """Absolute path of the SCR file. Empty string if file is not present.""" 3912 # Lazy property to avoid multiple calls to has_abiext. 3913 try: 3914 return self._scr_path 3915 except AttributeError: 3916 path = self.outdir.has_abiext("SCR.nc") 3917 if path: self._scr_path = path 3918 return path 3919 3920 def open_scr(self): 3921 """ 3922 Open the SIGRES file located in the in self.outdir. 3923 Returns |ScrFile| object, None if file could not be found or file is not readable. 3924 """ 3925 scr_path = self.scr_path 3926 3927 if not scr_path: 3928 self.history.critical("%s didn't produce a SCR.nc file in %s" % (self, self.outdir)) 3929 return None 3930 3931 # Open the GSR file and add its data to results.out 3932 from abipy.electrons.scr import ScrFile 3933 try: 3934 return ScrFile(scr_path) 3935 except Exception as exc: 3936 self.history.critical("Exception while reading SCR file at %s:\n%s" % (scr_path, str(exc))) 3937 return None 3938 3939 3940class SigmaTask(ManyBodyTask): 3941 """ 3942 Tasks for SIGMA calculations. Provides support for in-place restart via QPS files 3943 """ 3944 CRITICAL_EVENTS = [ 3945 events.QPSConvergenceWarning, 3946 ] 3947 3948 color_rgb = np.array((0, 255, 0)) / 255 3949 3950 def restart(self): 3951 # G calculations can be restarted only if we have the QPS file 3952 # from which we can read the results of the previous step. 3953 ext = "QPS" 3954 restart_file = self.outdir.has_abiext(ext) 3955 if not restart_file: 3956 raise self.RestartError("%s: Cannot find the QPS file to restart from." % self) 3957 3958 self.out_to_in(restart_file) 3959 3960 # Add the appropriate variable for restarting. 3961 irdvars = irdvars_for_ext(ext) 3962 self.set_vars(irdvars) 3963 3964 # Now we can resubmit the job. 3965 self.history.info("Will restart from %s", restart_file) 3966 return self._restart() 3967 3968 #def inspect(self, **kwargs): 3969 # """Plot graph showing the number of k-points computed and the wall-time used""" 3970 3971 @property 3972 def sigres_path(self): 3973 """Absolute path of the SIGRES file. Empty string if file is not present.""" 3974 # Lazy property to avoid multiple calls to has_abiext. 3975 try: 3976 return self._sigres_path 3977 except AttributeError: 3978 path = self.outdir.has_abiext("SIGRES") 3979 if path: self._sigres_path = path 3980 return path 3981 3982 def open_sigres(self): 3983 """ 3984 Open the SIGRES file located in the in self.outdir. 3985 Returns |SigresFile| object, None if file could not be found or file is not readable. 3986 """ 3987 sigres_path = self.sigres_path 3988 3989 if not sigres_path: 3990 self.history.critical("%s didn't produce a SIGRES file in %s" % (self, self.outdir)) 3991 return None 3992 3993 # Open the SIGRES file and add its data to results.out 3994 from abipy.electrons.gw import SigresFile 3995 try: 3996 return SigresFile(sigres_path) 3997 except Exception as exc: 3998 self.history.critical("Exception while reading SIGRES file at %s:\n%s" % (sigres_path, str(exc))) 3999 return None 4000 4001 def get_scissors_builder(self): 4002 """ 4003 Returns an instance of :class:`ScissorsBuilder` from the SIGRES file. 4004 4005 Raise: 4006 `RuntimeError` if SIGRES file is not found. 4007 """ 4008 from abipy.electrons.scissors import ScissorsBuilder 4009 if self.sigres_path: 4010 return ScissorsBuilder.from_file(self.sigres_path) 4011 else: 4012 raise RuntimeError("Cannot find SIGRES file!") 4013 4014 def get_results(self, **kwargs): 4015 results = super().get_results(**kwargs) 4016 4017 # Open the SIGRES file and add its data to results.out 4018 with self.open_sigres() as sigres: 4019 #results["out"].update(sigres.as_dict()) 4020 results.register_gridfs_files(SIGRES=sigres.filepath) 4021 4022 return results 4023 4024 4025class BseTask(ManyBodyTask): 4026 """ 4027 Task for Bethe-Salpeter calculations. 4028 4029 .. note:: 4030 4031 The BSE codes provides both iterative and direct schemes for the computation of the dielectric function. 4032 The direct diagonalization cannot be restarted whereas Haydock and CG support restarting. 4033 """ 4034 CRITICAL_EVENTS = [ 4035 events.HaydockConvergenceWarning, 4036 #events.BseIterativeDiagoConvergenceWarning, 4037 ] 4038 4039 color_rgb = np.array((128, 0, 255)) / 255 4040 4041 def restart(self): 4042 """ 4043 BSE calculations with Haydock can be restarted only if we have the 4044 excitonic Hamiltonian and the HAYDR_SAVE file. 4045 """ 4046 # TODO: This version seems to work but the main output file is truncated 4047 # TODO: Handle restart if CG method is used 4048 # TODO: restart should receive a list of critical events 4049 # the log file is complete though. 4050 irdvars = {} 4051 4052 # Move the BSE blocks to indata. 4053 # This is done only once at the end of the first run. 4054 # Successive restarts will use the BSR|BSC files in the indir directory 4055 # to initialize the excitonic Hamiltonian 4056 count = 0 4057 for ext in ("BSR", "BSC"): 4058 ofile = self.outdir.has_abiext(ext) 4059 if ofile: 4060 count += 1 4061 irdvars.update(irdvars_for_ext(ext)) 4062 self.out_to_in(ofile) 4063 4064 if not count: 4065 # outdir does not contain the BSR|BSC file. 4066 # This means that num_restart > 1 and the files should be in task.indir 4067 count = 0 4068 for ext in ("BSR", "BSC"): 4069 ifile = self.indir.has_abiext(ext) 4070 if ifile: 4071 count += 1 4072 4073 if not count: 4074 raise self.RestartError("%s: Cannot find BSR|BSC files in %s" % (self, self.indir)) 4075 4076 # Rename HAYDR_SAVE files 4077 count = 0 4078 for ext in ("HAYDR_SAVE", "HAYDC_SAVE"): 4079 ofile = self.outdir.has_abiext(ext) 4080 if ofile: 4081 count += 1 4082 irdvars.update(irdvars_for_ext(ext)) 4083 self.out_to_in(ofile) 4084 4085 if not count: 4086 raise self.RestartError("%s: Cannot find the HAYDR_SAVE file to restart from." % self) 4087 4088 # Add the appropriate variable for restarting. 4089 self.set_vars(irdvars) 4090 4091 # Now we can resubmit the job. 4092 #self.history.info("Will restart from %s", restart_file) 4093 return self._restart() 4094 4095 #def inspect(self, **kwargs): 4096 # """ 4097 # Plot the Haydock iterations with matplotlib. 4098 # 4099 # Returns: |matplotlib-Figure| or None if some error occurred. 4100 # """ 4101 # haydock_cycle = abiinspect.HaydockIterations.from_file(self.output_file.path) 4102 # if haydock_cycle is not None: 4103 # if "title" not in kwargs: kwargs["title"] = str(self) 4104 # return haydock_cycle.plot(**kwargs) 4105 4106 @property 4107 def mdf_path(self): 4108 """Absolute path of the MDF file. Empty string if file is not present.""" 4109 # Lazy property to avoid multiple calls to has_abiext. 4110 try: 4111 return self._mdf_path 4112 except AttributeError: 4113 path = self.outdir.has_abiext("MDF.nc") 4114 if path: self._mdf_path = path 4115 return path 4116 4117 def open_mdf(self): 4118 """ 4119 Open the MDF file located in the in self.outdir. 4120 Returns |MdfFile| object, None if file could not be found or file is not readable. 4121 """ 4122 mdf_path = self.mdf_path 4123 if not mdf_path: 4124 self.history.critical("%s didn't produce a MDF file in %s" % (self, self.outdir)) 4125 return None 4126 4127 # Open the DFF file and add its data to results.out 4128 from abipy.electrons.bse import MdfFile 4129 try: 4130 return MdfFile(mdf_path) 4131 except Exception as exc: 4132 self.history.critical("Exception while reading MDF file at %s:\n%s" % (mdf_path, str(exc))) 4133 return None 4134 4135 def get_results(self, **kwargs): 4136 results = super().get_results(**kwargs) 4137 4138 with self.open_mdf() as mdf: 4139 #results["out"].update(mdf.as_dict()) 4140 #epsilon_infinity optical_gap 4141 results.register_gridfs_files(MDF=mdf.filepath) 4142 4143 return results 4144 4145 4146class OpticTask(Task): 4147 """ 4148 Task for the computation of optical spectra with optic i.e. 4149 RPA without local-field effects and velocity operator computed from DDK files. 4150 """ 4151 color_rgb = np.array((255, 204, 102)) / 255 4152 4153 def __init__(self, optic_input, nscf_node, ddk_nodes, use_ddknc=False, workdir=None, manager=None): 4154 """ 4155 Create an instance of :class:`OpticTask` from n string containing the input. 4156 4157 Args: 4158 optic_input: :class:`OpticInput` object with optic variables. 4159 nscf_node: The task that will produce the WFK file with the KS energies or path to the WFK file. 4160 ddk_nodes: List of :class:`DdkTask` nodes that will produce the DDK files or list of DDK filepaths. 4161 Order (x, y, z) 4162 workdir: Path to the working directory. 4163 manager: |TaskManager| object. 4164 """ 4165 # Convert paths to FileNodes 4166 self.nscf_node = Node.as_node(nscf_node) 4167 self.ddk_nodes = [Node.as_node(n) for n in ddk_nodes] 4168 assert len(ddk_nodes) == 3 4169 #print(self.nscf_node, self.ddk_nodes) 4170 4171 # Use DDK extension instead of 1WF 4172 if use_ddknc: 4173 deps = {n: "DDK.nc" for n in self.ddk_nodes} 4174 else: 4175 deps = {n: "1WF" for n in self.ddk_nodes} 4176 4177 deps.update({self.nscf_node: "WFK"}) 4178 4179 super().__init__(optic_input, workdir=workdir, manager=manager, deps=deps) 4180 4181 def set_workdir(self, workdir, chroot=False): 4182 """Set the working directory of the task.""" 4183 super().set_workdir(workdir, chroot=chroot) 4184 # Small hack: the log file of optics is actually the main output file. 4185 self.output_file = self.log_file 4186 4187 def set_vars(self, *args, **kwargs): 4188 """ 4189 Optic does not use `get` or `ird` variables hence we should never try 4190 to change the input when we connect this task 4191 """ 4192 kwargs.update(dict(*args)) 4193 self.history.info("OpticTask intercepted set_vars with args %s" % kwargs) 4194 4195 if "autoparal" in kwargs: self.input.set_vars(autoparal=kwargs["autoparal"]) 4196 if "max_ncpus" in kwargs: self.input.set_vars(max_ncpus=kwargs["max_ncpus"]) 4197 4198 @property 4199 def executable(self): 4200 """Path to the executable required for running the :class:`OpticTask`.""" 4201 try: 4202 return self._executable 4203 except AttributeError: 4204 return "optic" 4205 4206 @property 4207 def filesfile_string(self): 4208 """String with the list of files and prefixes needed to execute ABINIT.""" 4209 lines = [] 4210 app = lines.append 4211 4212 app(self.input_file.path) # Path to the input file 4213 app(os.path.join(self.workdir, "unused")) # Path to the output file 4214 app(os.path.join(self.workdir, self.prefix.odata)) # Prefix for output data 4215 4216 return "\n".join(lines) 4217 4218 @property 4219 def wfk_filepath(self): 4220 """Returns (at runtime) the absolute path of the WFK file produced by the NSCF run.""" 4221 return self.nscf_node.outdir.has_abiext("WFK") 4222 4223 @property 4224 def ddk_filepaths(self): 4225 """Returns (at runtime) the absolute path of the DDK files produced by the DDK runs.""" 4226 # This to support new version of optic that used DDK.nc 4227 paths = [ddk_task.outdir.has_abiext("DDK.nc") for ddk_task in self.ddk_nodes] 4228 if all(p for p in paths): 4229 return paths 4230 4231 # This is deprecated and can be removed when new version of Abinit is released. 4232 return [ddk_task.outdir.has_abiext("1WF") for ddk_task in self.ddk_nodes] 4233 4234 def make_input(self): 4235 """Construct and write the input file of the calculation.""" 4236 # Set the file paths. 4237 all_files = {"ddkfile_" + str(n + 1): ddk for n, ddk in enumerate(self.ddk_filepaths)} 4238 all_files.update({"wfkfile": self.wfk_filepath}) 4239 files_nml = {"FILES": all_files} 4240 files = nmltostring(files_nml) 4241 4242 # Get the input specified by the user 4243 user_file = nmltostring(self.input.as_dict()) 4244 4245 # Join them. 4246 return files + user_file 4247 4248 def setup(self): 4249 """Public method called before submitting the task.""" 4250 4251 def make_links(self): 4252 """ 4253 Optic allows the user to specify the paths of the input file. 4254 hence we don't need to create symbolic links. 4255 """ 4256 4257 def get_results(self, **kwargs): 4258 return super().get_results(**kwargs) 4259 4260 def fix_abicritical(self): 4261 """ 4262 Cannot fix abicritical errors for optic 4263 """ 4264 return 0 4265 4266 #@check_spectator 4267 def reset_from_scratch(self): 4268 """ 4269 restart from scratch, this is to be used if a job is restarted with more resources after a crash 4270 """ 4271 # Move output files produced in workdir to _reset otherwise check_status continues 4272 # to see the task as crashed even if the job did not run 4273 # Create reset directory if not already done. 4274 reset_dir = os.path.join(self.workdir, "_reset") 4275 reset_file = os.path.join(reset_dir, "_counter") 4276 if not os.path.exists(reset_dir): 4277 os.mkdir(reset_dir) 4278 num_reset = 1 4279 else: 4280 with open(reset_file, "rt") as fh: 4281 num_reset = 1 + int(fh.read()) 4282 4283 # Move files to reset and append digit with reset index. 4284 def move_file(f): 4285 if not f.exists: return 4286 try: 4287 f.move(os.path.join(reset_dir, f.basename + "_" + str(num_reset))) 4288 except OSError as exc: 4289 self.history.warning("Couldn't move file {}. exc: {}".format(f, str(exc))) 4290 4291 for fname in ("output_file", "log_file", "stderr_file", "qout_file", "qerr_file", "mpiabort_file"): 4292 move_file(getattr(self, fname)) 4293 4294 with open(reset_file, "wt") as fh: 4295 fh.write(str(num_reset)) 4296 4297 self.start_lockfile.remove() 4298 4299 # Reset datetimes 4300 self.datetimes.reset() 4301 4302 return self._restart(submit=False) 4303 4304 def fix_queue_critical(self): 4305 """ 4306 This function tries to fix critical events originating from the queue submission system. 4307 4308 General strategy, first try to increase resources in order to fix the problem, 4309 if this is not possible, call a task specific method to attempt to decrease the demands. 4310 4311 Returns: 4312 1 if task has been fixed else 0. 4313 """ 4314 from abipy.flowtk.scheduler_error_parsers import NodeFailureError, MemoryCancelError, TimeCancelError 4315 4316 if not self.queue_errors: 4317 if self.mem_scales or self.load_scales: 4318 try: 4319 self.manager.increase_resources() # acts either on the policy or on the qadapter 4320 self.reset_from_scratch() 4321 return 4322 except ManagerIncreaseError: 4323 self.set_status(self.S_ERROR, msg='unknown queue error, could not increase resources any further') 4324 raise FixQueueCriticalError 4325 else: 4326 self.set_status(self.S_ERROR, msg='unknown queue error, no options left') 4327 raise FixQueueCriticalError 4328 4329 else: 4330 for error in self.queue_errors: 4331 self.history.info('fixing: %s' % str(error)) 4332 4333 if isinstance(error, NodeFailureError): 4334 # if the problematic node is known, exclude it 4335 if error.nodes is not None: 4336 try: 4337 self.manager.exclude_nodes(error.nodes) 4338 self.reset_from_scratch() 4339 self.set_status(self.S_READY, msg='excluding nodes') 4340 except Exception: 4341 raise FixQueueCriticalError 4342 else: 4343 self.set_status(self.S_ERROR, msg='Node error but no node identified.') 4344 raise FixQueueCriticalError 4345 4346 elif isinstance(error, MemoryCancelError): 4347 # ask the qadapter to provide more resources, i.e. more cpu's so more total memory if the code 4348 # scales this should fix the memeory problem 4349 # increase both max and min ncpu of the autoparalel and rerun autoparalel 4350 if self.mem_scales: 4351 try: 4352 self.manager.increase_ncpus() 4353 self.reset_from_scratch() 4354 self.set_status(self.S_READY, msg='increased ncps to solve memory problem') 4355 return 4356 except ManagerIncreaseError: 4357 self.history.warning('increasing ncpus failed') 4358 4359 # if the max is reached, try to increase the memory per cpu: 4360 try: 4361 self.manager.increase_mem() 4362 self.reset_from_scratch() 4363 self.set_status(self.S_READY, msg='increased mem') 4364 return 4365 except ManagerIncreaseError: 4366 self.history.warning('increasing mem failed') 4367 4368 # if this failed ask the task to provide a method to reduce the memory demand 4369 try: 4370 self.reduce_memory_demand() 4371 self.reset_from_scratch() 4372 self.set_status(self.S_READY, msg='decreased mem demand') 4373 return 4374 except DecreaseDemandsError: 4375 self.history.warning('decreasing demands failed') 4376 4377 msg = ('Memory error detected but the memory could not be increased neither could the\n' 4378 'memory demand be decreased. Unrecoverable error.') 4379 self.set_status(self.S_ERROR, msg) 4380 raise FixQueueCriticalError 4381 4382 elif isinstance(error, TimeCancelError): 4383 # ask the qadapter to provide more time 4384 try: 4385 self.manager.increase_time() 4386 self.reset_from_scratch() 4387 self.set_status(self.S_READY, msg='increased wall time') 4388 return 4389 except ManagerIncreaseError: 4390 self.history.warning('increasing the walltime failed') 4391 4392 # if this fails ask the qadapter to increase the number of cpus 4393 if self.load_scales: 4394 try: 4395 self.manager.increase_ncpus() 4396 self.reset_from_scratch() 4397 self.set_status(self.S_READY, msg='increased number of cpus') 4398 return 4399 except ManagerIncreaseError: 4400 self.history.warning('increase ncpus to speed up the calculation to stay in the walltime failed') 4401 4402 # if this failed ask the task to provide a method to speed up the task 4403 try: 4404 self.speed_up() 4405 self.reset_from_scratch() 4406 self.set_status(self.S_READY, msg='task speedup') 4407 return 4408 except DecreaseDemandsError: 4409 self.history.warning('decreasing demands failed') 4410 4411 msg = ('Time cancel error detected but the time could not be increased neither could\n' 4412 'the time demand be decreased by speedup of increasing the number of cpus.\n' 4413 'Unrecoverable error.') 4414 self.set_status(self.S_ERROR, msg) 4415 4416 else: 4417 msg = 'No solution provided for error %s. Unrecoverable error.' % error.name 4418 self.set_status(self.S_ERROR, msg) 4419 4420 return 0 4421 4422 def autoparal_run(self): 4423 """ 4424 Find an optimal set of parameters for the execution of the Optic task 4425 This method can change the submission parameters e.g. the number of CPUs for MPI and OpenMp. 4426 4427 Returns 0 if success 4428 """ 4429 policy = self.manager.policy 4430 4431 if policy.autoparal == 0: # or policy.max_ncpus in [None, 1]: 4432 self.history.info("Nothing to do in autoparal, returning (None, None)") 4433 return 0 4434 4435 if policy.autoparal != 1: 4436 raise NotImplementedError("autoparal != 1") 4437 4438 ############################################################################ 4439 # Run ABINIT in sequential to get the possible configurations with max_ncpus 4440 ############################################################################ 4441 4442 # Set the variables for automatic parallelization 4443 # Will get all the possible configurations up to max_ncpus 4444 # Return immediately if max_ncpus == 1 4445 max_ncpus = self.manager.max_cores 4446 if max_ncpus == 1: return 0 4447 4448 autoparal_vars = dict(autoparal=policy.autoparal, max_ncpus=max_ncpus) 4449 self.set_vars(autoparal_vars) 4450 4451 # Run the job in a shell subprocess with mpi_procs = 1 4452 # we don't want to make a request to the queue manager for this simple job! 4453 # Return code is always != 0 4454 process = self.manager.to_shell_manager(mpi_procs=1).launch(self) 4455 self.history.pop() 4456 retcode = process.wait() 4457 # To avoid: ResourceWarning: unclosed file <_io.BufferedReader name=87> in py3k 4458 process.stderr.close() 4459 #process.stdout.close() 4460 4461 # Remove the variables added for the automatic parallelization 4462 self.input.remove_vars(list(autoparal_vars.keys())) 4463 4464 ############################################################## 4465 # Parse the autoparal configurations from the main output file 4466 ############################################################## 4467 parser = ParalHintsParser() 4468 try: 4469 pconfs = parser.parse(self.output_file.path) 4470 except parser.Error: 4471 # In principle Abinit should have written a complete log file 4472 # because we called .wait() but sometimes the Yaml doc is incomplete and 4473 # the parser raises. Let's wait 5 secs and then try again. 4474 time.sleep(5) 4475 try: 4476 pconfs = parser.parse(self.output_file.path) 4477 except parser.Error: 4478 self.history.critical("Error while parsing Autoparal section:\n%s" % straceback()) 4479 return 2 4480 4481 ###################################################### 4482 # Select the optimal configuration according to policy 4483 ###################################################### 4484 #optconf = self.find_optconf(pconfs) 4485 # Select the partition on which we'll be running and set MPI/OMP cores. 4486 optconf = self.manager.select_qadapter(pconfs) 4487 4488 #################################################### 4489 # Change the input file and/or the submission script 4490 #################################################### 4491 self.set_vars(optconf.vars) 4492 4493 # Write autoparal configurations to JSON file. 4494 d = pconfs.as_dict() 4495 d["optimal_conf"] = optconf 4496 json_pretty_dump(d, os.path.join(self.workdir, "autoparal.json")) 4497 4498 ############## 4499 # Finalization 4500 ############## 4501 # Reset the status, remove garbage files ... 4502 self.set_status(self.S_INIT, msg='finished auto paralell') 4503 4504 # Remove the output file since Abinit likes to create new files 4505 # with extension .outA, .outB if the file already exists. 4506 os.remove(self.output_file.path) 4507 #os.remove(self.log_file.path) 4508 os.remove(self.stderr_file.path) 4509 4510 return 0 4511 4512 4513class AnaddbTask(Task): 4514 """Task for Anaddb runs (post-processing of DFPT calculations).""" 4515 4516 color_rgb = np.array((204, 102, 255)) / 255 4517 4518 def __init__(self, anaddb_input, ddb_node, 4519 gkk_node=None, md_node=None, ddk_node=None, workdir=None, manager=None): 4520 """ 4521 Create an instance of AnaddbTask from a string containing the input. 4522 4523 Args: 4524 anaddb_input: string with the anaddb variables. 4525 ddb_node: The node that will produce the DDB file. Accept |Task|, |Work| or filepath. 4526 gkk_node: The node that will produce the GKK file (optional). Accept |Task|, |Work| or filepath. 4527 md_node: The node that will produce the MD file (optional). Accept |Task|, |Work| or filepath. 4528 gkk_node: The node that will produce the GKK file (optional). Accept |Task|, |Work| or filepath. 4529 workdir: Path to the working directory (optional). 4530 manager: |TaskManager| object (optional). 4531 """ 4532 # Keep a reference to the nodes. 4533 self.ddb_node = Node.as_node(ddb_node) 4534 deps = {self.ddb_node: "DDB"} 4535 4536 self.gkk_node = Node.as_node(gkk_node) 4537 if self.gkk_node is not None: 4538 deps.update({self.gkk_node: "GKK"}) 4539 4540 # I never used it! 4541 self.md_node = Node.as_node(md_node) 4542 if self.md_node is not None: 4543 deps.update({self.md_node: "MD"}) 4544 4545 self.ddk_node = Node.as_node(ddk_node) 4546 if self.ddk_node is not None: 4547 deps.update({self.ddk_node: "DDK"}) 4548 4549 super().__init__(input=anaddb_input, workdir=workdir, manager=manager, deps=deps) 4550 4551 @classmethod 4552 def temp_shell_task(cls, inp, ddb_node, mpi_procs=1, 4553 gkk_node=None, md_node=None, ddk_node=None, workdir=None, manager=None): 4554 """ 4555 Build a |AnaddbTask| with a temporary workdir. The task is executed via 4556 the shell with 1 MPI proc. Mainly used for post-processing the DDB files. 4557 4558 Args: 4559 mpi_procs: Number of MPI processes to use. 4560 anaddb_input: string with the anaddb variables. 4561 ddb_node: The node that will produce the DDB file. Accept |Task|, |Work| or filepath. 4562 4563 See `AnaddbInit` for the meaning of the other arguments. 4564 """ 4565 # Build a simple manager to run the job in a shell subprocess 4566 import tempfile 4567 workdir = tempfile.mkdtemp() if workdir is None else workdir 4568 if manager is None: manager = TaskManager.from_user_config() 4569 4570 # Construct the task and run it 4571 return cls(inp, ddb_node, 4572 gkk_node=gkk_node, md_node=md_node, ddk_node=ddk_node, 4573 workdir=workdir, manager=manager.to_shell_manager(mpi_procs=mpi_procs)) 4574 4575 @property 4576 def executable(self): 4577 """Path to the executable required for running the |AnaddbTask|.""" 4578 try: 4579 return self._executable 4580 except AttributeError: 4581 return "anaddb" 4582 4583 @property 4584 def filesfile_string(self): 4585 """String with the list of files and prefixes needed to execute ABINIT.""" 4586 lines = [] 4587 app = lines.append 4588 4589 app(self.input_file.path) # 1) Path of the input file 4590 app(self.output_file.path) # 2) Path of the output file 4591 app(self.ddb_filepath) # 3) Input derivative database e.g. t13.ddb.in 4592 app(self.md_filepath) # 4) Output molecular dynamics e.g. t13.md 4593 app(self.gkk_filepath) # 5) Input elphon matrix elements (GKK file) 4594 app(self.outdir.path_join("out")) # 6) Base name for elphon output files e.g. t13 4595 app(self.ddk_filepath) # 7) File containing ddk filenames for elphon/transport. 4596 4597 return "\n".join(lines) 4598 4599 @property 4600 def ddb_filepath(self): 4601 """Returns (at runtime) the absolute path of the input DDB file.""" 4602 # This is not very elegant! A possible approach could to be path self.ddb_node.outdir! 4603 if isinstance(self.ddb_node, FileNode): return self.ddb_node.filepath 4604 path = self.ddb_node.outdir.has_abiext("DDB") 4605 return path if path else "DDB_FILE_DOES_NOT_EXIST" 4606 4607 @property 4608 def md_filepath(self): 4609 """Returns (at runtime) the absolute path of the input MD file.""" 4610 if self.md_node is None: return "MD_FILE_DOES_NOT_EXIST" 4611 if isinstance(self.md_node, FileNode): return self.md_node.filepath 4612 4613 path = self.md_node.outdir.has_abiext("MD") 4614 return path if path else "MD_FILE_DOES_NOT_EXIST" 4615 4616 @property 4617 def gkk_filepath(self): 4618 """Returns (at runtime) the absolute path of the input GKK file.""" 4619 if self.gkk_node is None: return "GKK_FILE_DOES_NOT_EXIST" 4620 if isinstance(self.gkk_node, FileNode): return self.gkk_node.filepath 4621 4622 path = self.gkk_node.outdir.has_abiext("GKK") 4623 return path if path else "GKK_FILE_DOES_NOT_EXIST" 4624 4625 @property 4626 def ddk_filepath(self): 4627 """Returns (at runtime) the absolute path of the input DKK file.""" 4628 if self.ddk_node is None: return "DDK_FILE_DOES_NOT_EXIST" 4629 if isinstance(self.ddk_node, FileNode): return self.ddk_node.filepath 4630 4631 path = self.ddk_node.outdir.has_abiext("DDK") 4632 return path if path else "DDK_FILE_DOES_NOT_EXIST" 4633 4634 def setup(self): 4635 """Public method called before submitting the task.""" 4636 4637 def make_links(self): 4638 """ 4639 Anaddb allows the user to specify the paths of the input file. 4640 hence we don't need to create symbolic links. 4641 """ 4642 4643 def outpath_from_ext(self, ext): 4644 if ext == "anaddb.nc": 4645 path = os.path.join(self.workdir, "anaddb.nc") 4646 if os.path.isfile(path): return path 4647 4648 path = self.wdir.has_abiext(ext) 4649 if not path: 4650 raise RuntimeError("Anaddb task `%s` didn't produce file with extenstion: `%s`" % (self, ext)) 4651 4652 return path 4653 4654 def open_phbst(self): 4655 """Open PHBST file produced by Anaddb and returns |PhbstFile| object.""" 4656 from abipy.dfpt.phonons import PhbstFile 4657 phbst_path = self.outpath_from_ext("PHBST.nc") 4658 return PhbstFile(phbst_path) 4659 4660 def open_phdos(self): 4661 """Open PHDOS file produced by Anaddb and returns |PhdosFile| object.""" 4662 from abipy.dfpt.phonons import PhdosFile 4663 phdos_path = self.outpath_from_ext("PHDOS.nc") 4664 return PhdosFile(phdos_path) 4665 4666 def get_results(self, **kwargs): 4667 return super().get_results(**kwargs) 4668 4669 4670#class BoxcuttedPhononTask(PhononTask): 4671# """ 4672# This task compute phonons with a two-step algorithm. 4673# The first DFPT run is done with low-accuracy settings for boxcutmin and ecut 4674# The second DFPT run uses boxcutmin 2.0 and normal ecut and restarts from 4675# the 1WFK file generated previously. 4676# """ 4677# @classmethod 4678# def patch_flow(cls, flow): 4679# for task in flow.iflat_tasks(): 4680# if isinstance(task, PhononTask): task.__class__ = cls 4681# 4682# def setup(self): 4683# super().setup() 4684# self.final_dfp_done = False if not hasattr(self, "final_dfp_done") else self.final_dfp_done 4685# if not self.final_dfp_done: 4686# # First run: use boxcutmin 1.5 and low-accuracy hints (assume pseudos with hints). 4687# pseudos = self.input.pseudos 4688# ecut = max(p.hint_for_accuracy("low").ecut for p in pseudos) 4689# pawecutdg = max(p.hint_for_accuracy("low").pawecutdg for p in pseudos) if self.input.ispaw else None 4690# self.set_vars(boxcutmin=1.5, ecut=ecut, pawecutdg=pawecutdg, prtwf=1) 4691# 4692# def _on_ok(self): 4693# results = super()._on_ok() 4694# if not self.final_dfp_done: 4695# # Second run: use exact box and normal-accuracy hints (assume pseudos with hints). 4696# pseudos = self.input.pseudos 4697# ecut = max(p.hint_for_accuracy("normal").ecut for p in pseudos) 4698# pawecutdg = max(p.hint_for_accuracy("normal").pawecutdg for p in pseudos) if self.input.ispaw else None 4699# self.set_vars(boxcutmin=2.0, ecut=ecut, pawecutdg=pawecutdg, prtwf=-1) 4700# self.finalized = True 4701# self.final_dfp_done = True 4702# self.restart() 4703# 4704# return results 4705