1# coding: utf-8 2""" 3Works for Abinit 4""" 5 6import os 7import shutil 8import time 9import abc 10import collections 11import numpy as np 12 13from monty.collections import AttrDict 14from monty.itertools import chunks 15from monty.functools import lazy_property 16from monty.fnmatch import WildCard 17from pydispatch import dispatcher 18from pymatgen.core.units import EnergyArray 19from . import wrappers 20from .nodes import Dependency, Node, NodeError, NodeResults, FileNode #, check_spectator 21from .tasks import (Task, AbinitTask, ScfTask, NscfTask, DfptTask, PhononTask, ElasticTask, DdkTask, EffMassTask, 22 BseTask, RelaxTask, DdeTask, BecTask, ScrTask, SigmaTask, TaskManager, 23 DteTask, EphTask, CollinearThenNonCollinearScfTask) 24 25from .utils import Directory 26from .netcdf import ETSF_Reader, NetcdfReader 27from .abitimer import AbinitTimerParser 28 29__author__ = "Matteo Giantomassi" 30__copyright__ = "Copyright 2013, The Materials Project" 31__version__ = "0.1" 32__maintainer__ = "Matteo Giantomassi" 33 34 35__all__ = [ 36 "Work", 37 "BandStructureWork", 38 "RelaxWork", 39 "G0W0Work", 40 "QptdmWork", 41 "SigmaConvWork", 42 "BseMdfWork", 43 "PhononWork", 44 "PhononWfkqWork", 45 "GKKPWork", 46 "BecWork", 47 "DteWork", 48 "ConducWork", 49] 50 51 52class WorkResults(NodeResults): 53 JSON_SCHEMA = NodeResults.JSON_SCHEMA.copy() 54 55 @classmethod 56 def from_node(cls, work): 57 """Initialize an instance from a |Work| instance.""" 58 new = super().from_node(work) 59 60 # Will put all files found in outdir in GridFs 61 # Warning: assuming binary files. 62 d = {os.path.basename(f): f for f in work.outdir.list_filepaths()} 63 new.register_gridfs_files(**d) 64 65 return new 66 67 68class WorkError(NodeError): 69 """Base class for the exceptions raised by Work objects.""" 70 71 72class BaseWork(Node, metaclass=abc.ABCMeta): 73 """ 74 .. rubric:: Inheritance Diagram 75 .. inheritance-diagram:: BaseWork 76 """ 77 Error = WorkError 78 79 Results = WorkResults 80 81 # interface modeled after subprocess.Popen 82 @property 83 @abc.abstractmethod 84 def processes(self): 85 """Return a list of objects that support the `subprocess.Popen` protocol.""" 86 87 def poll(self): 88 """ 89 Check if all child processes have terminated. Set and return returncode attribute. 90 """ 91 return [task.poll() for task in self] 92 93 def wait(self): 94 """ 95 Wait for child processed to terminate. Set and return returncode attribute. 96 """ 97 return [task.wait() for task in self] 98 99 def communicate(self, input=None): 100 """ 101 Interact with processes: Send data to stdin. Read data from stdout and 102 stderr, until end-of-file is reached. 103 Wait for process to terminate. The optional input argument should be a 104 string to be sent to the child processed, or None, if no data should be 105 sent to the children. 106 107 communicate() returns a list of tuples (stdoutdata, stderrdata). 108 """ 109 return [task.communicate(input) for task in self] 110 111 @property 112 def returncodes(self): 113 """ 114 The children return codes, set by poll() and wait() (and indirectly by communicate()). 115 A None value indicates that the process hasn't terminated yet. 116 A negative value -N indicates that the child was terminated by signal N (Unix only). 117 """ 118 return [task.returncode for task in self] 119 120 @property 121 def ncores_reserved(self): 122 """ 123 Returns the number of cores reserved in this moment. 124 A core is reserved if it's still not running but 125 we have submitted the task to the queue manager. 126 """ 127 return sum(task.manager.num_cores for task in self if task.status == task.S_SUB) 128 129 @property 130 def ncores_allocated(self): 131 """ 132 Returns the number of CPUs allocated in this moment. 133 A core is allocated if it's running a task or if we have 134 submitted a task to the queue manager but the job is still pending. 135 """ 136 return sum(task.manager.num_cores for task in self if task.status in [task.S_SUB, task.S_RUN]) 137 138 @property 139 def ncores_used(self): 140 """ 141 Returns the number of cores used in this moment. 142 A core is used if there's a job that is running on it. 143 """ 144 return sum(task.manager.num_cores for task in self if task.status == task.S_RUN) 145 146 def fetch_task_to_run(self): 147 """ 148 Returns the first task that is ready to run or 149 None if no task can be submitted at present" 150 151 Raises: 152 `StopIteration` if all tasks are done. 153 """ 154 # All the tasks are done so raise an exception 155 # that will be handled by the client code. 156 if all(task.is_completed for task in self): 157 raise StopIteration("All tasks completed.") 158 159 for task in self: 160 if task.can_run: 161 return task 162 163 # No task found, this usually happens when we have dependencies. 164 # Beware of possible deadlocks here! 165 self.history.warning("Possible deadlock in fetch_task_to_run!") 166 return None 167 168 def fetch_alltasks_to_run(self): 169 """ 170 Returns a list with all the tasks that can be submitted. 171 Empty list if not task has been found. 172 """ 173 return [task for task in self if task.can_run] 174 175 @abc.abstractmethod 176 def setup(self, *args, **kwargs): 177 """Method called before submitting the calculations.""" 178 179 def _setup(self, *args, **kwargs): 180 self.setup(*args, **kwargs) 181 182 def connect_signals(self): 183 """ 184 Connect the signals within the work. 185 The |Work| is responsible for catching the important signals raised from 186 its task and raise new signals when some particular condition occurs. 187 """ 188 for task in self: 189 dispatcher.connect(self.on_ok, signal=task.S_OK, sender=task) 190 191 def disconnect_signals(self): 192 """ 193 Disable the signals within the work. This function reverses the process of `connect_signals` 194 """ 195 for task in self: 196 try: 197 dispatcher.disconnect(self.on_ok, signal=task.S_OK, sender=task) 198 except dispatcher.errors.DispatcherKeyError as exc: 199 self.history.debug(str(exc)) 200 201 @property 202 def all_ok(self): 203 return all(task.status == task.S_OK for task in self) 204 205 #@check_spectator 206 def on_ok(self, sender): 207 """ 208 This callback is called when one task reaches status `S_OK`. 209 It executes on_all_ok when all tasks in self have reached `S_OK`. 210 """ 211 self.history.debug("In on_ok with sender %s" % sender) 212 213 if self.all_ok: 214 if self.finalized: 215 return AttrDict(returncode=0, message="Work has been already finalized") 216 else: 217 # Set finalized here, because on_all_ok might change it (e.g. Relax + EOS in a single work) 218 self.finalized = True 219 try: 220 results = AttrDict(**self.on_all_ok()) 221 except Exception as exc: 222 self.history.critical("on_all_ok raises %s" % str(exc)) 223 self.finalized = False 224 raise 225 226 # Signal to possible observers that the `Work` reached S_OK 227 self.history.info("Work %s is finalized and broadcasts signal S_OK" % str(self)) 228 if self._finalized: 229 self.send_signal(self.S_OK) 230 231 return results 232 233 return AttrDict(returncode=1, message="Not all tasks are OK!") 234 235 #@check_spectator 236 def on_all_ok(self): 237 """ 238 This method is called once the `Work` is completed i.e. when all tasks have reached status S_OK. 239 Subclasses should provide their own implementation 240 241 Returns: 242 Dictionary that must contain at least the following entries: 243 returncode: 0 on success. 244 message: a string that should provide a human-readable description of what has been performed. 245 """ 246 return dict(returncode=0, message="Calling on_all_ok of the base class!") 247 248 def get_results(self, **kwargs): 249 """ 250 Method called once the calculations are completed. 251 The base version returns a dictionary task_name: TaskResults for each task in self. 252 """ 253 results = self.Results.from_node(self) 254 return results 255 256 def get_graphviz(self, engine="automatic", graph_attr=None, node_attr=None, edge_attr=None): 257 """ 258 Generate task graph in the DOT language (only parents and children of this work). 259 260 Args: 261 engine: Layout command used. ['dot', 'neato', 'twopi', 'circo', 'fdp', 'sfdp', 'patchwork', 'osage'] 262 graph_attr: Mapping of (attribute, value) pairs for the graph. 263 node_attr: Mapping of (attribute, value) pairs set for all nodes. 264 edge_attr: Mapping of (attribute, value) pairs set for all edges. 265 266 Returns: graphviz.Digraph <https://graphviz.readthedocs.io/en/stable/api.html#digraph> 267 """ 268 from graphviz import Digraph 269 fg = Digraph("work", #filename="work_%s.gv" % os.path.basename(self.workdir), 270 engine="fdp" if engine == "automatic" else engine) 271 272 # Set graph attributes. 273 # https://www.graphviz.org/doc/info/ 274 #fg.attr(label="%s@%s" % (self.__class__.__name__, self.relworkdir)) 275 fg.attr(label=repr(self)) 276 #fg.attr(fontcolor="white", bgcolor='purple:pink') 277 fg.attr(rankdir="LR", pagedir="BL") 278 #fg.attr(constraint="false", pack="true", packMode="clust") 279 fg.node_attr.update(color='lightblue2', style='filled') 280 #fg.node_attr.update(ranksep='equally') 281 282 # Add input attributes. 283 if graph_attr is not None: 284 fg.graph_attr.update(**graph_attr) 285 if node_attr is not None: 286 fg.node_attr.update(**node_attr) 287 if edge_attr is not None: 288 fg.edge_attr.update(**edge_attr) 289 290 def node_kwargs(node): 291 return dict( 292 #shape="circle", 293 color=node.color_hex, 294 label=(str(node) if not hasattr(node, "pos_str") else 295 node.pos_str + "\n" + node.__class__.__name__), 296 ) 297 298 edge_kwargs = dict(arrowType="vee", style="solid") 299 cluster_kwargs = dict(rankdir="LR", pagedir="BL", style="rounded", bgcolor="azure2") 300 301 # Build cluster with tasks in *this* work 302 cluster_name = "cluster%s" % self.name 303 with fg.subgraph(name=cluster_name) as wg: 304 wg.attr(**cluster_kwargs) 305 wg.attr(label="%s (%s)" % (self.__class__.__name__, self.name)) 306 for task in self: 307 wg.node(task.name, **node_kwargs(task)) 308 # Connect task to children 309 for child in task.get_children(): 310 # Test if child is in this cluster (self). 311 myg = wg if child in self else fg 312 myg.node(child.name, **node_kwargs(child)) 313 # Find file extensions required by this task 314 i = [dep.node for dep in child.deps].index(task) 315 edge_label = "+".join(child.deps[i].exts) 316 myg.edge(task.name, child.name, label=edge_label, color=task.color_hex, 317 **edge_kwargs) 318 319 # Connect task to parents 320 for parent in task.get_parents(): 321 # Test if parent is in this cluster (self). 322 myg = wg if parent in self else fg 323 myg.node(parent.name, **node_kwargs(parent)) 324 # Find file extensions required by this task 325 i = [dep.node for dep in task.deps].index(parent) 326 edge_label = "+".join(task.deps[i].exts) 327 myg.edge(parent.name, task.name, label=edge_label, color=parent.color_hex, 328 **edge_kwargs) 329 330 # Treat the case in which we have a work producing output for tasks in *this* work. 331 #for work in self.flow: 332 # children = work.get_children() 333 # if not children or all(child not in self for child in children): 334 # continue 335 # cluster_name = "cluster%s" % work.name 336 # seen = set() 337 # for child in children: 338 # if child not in self: continue 339 # # This is not needed, too much confusing 340 # #fg.edge(cluster_name, child.name, color=work.color_hex, **edge_kwargs) 341 # # Find file extensions required by work 342 # i = [dep.node for dep in child.deps].index(work) 343 # for ext in child.deps[i].exts: 344 # out = "%s (%s)" % (ext, work.name) 345 # fg.node(out) 346 # fg.edge(out, child.name, **edge_kwargs) 347 # key = (cluster_name, out) 348 # if key not in seen: 349 # fg.edge(cluster_name, out, color=work.color_hex, **edge_kwargs) 350 # seen.add(key) 351 352 return fg 353 354 355class NodeContainer(metaclass=abc.ABCMeta): 356 """ 357 Mixin classes for `Work` and `Flow` objects providing helper functions 358 to register tasks in the container. The helper function call the 359 `register` method of the container. 360 """ 361 # Abstract protocol for containers 362 363 @abc.abstractmethod 364 def register_task(self, *args, **kwargs): 365 """ 366 Register a task in the container. 367 """ 368 # TODO: shall flow.register_task return a Task or a Work? 369 370 # Helper functions to register Task subclasses. 371 def register_scf_task(self, *args, **kwargs): 372 """Register a Scf task.""" 373 kwargs["task_class"] = ScfTask 374 return self.register_task(*args, **kwargs) 375 376 def register_collinear_then_noncollinear_scf_task(self, *args, **kwargs): 377 """Register a Scf task that perform a SCF run first with nsppol = 2 and then nspinor = 2""" 378 kwargs["task_class"] = CollinearThenNonCollinearScfTask 379 return self.register_task(*args, **kwargs) 380 381 def register_nscf_task(self, *args, **kwargs): 382 """Register a nscf task.""" 383 kwargs["task_class"] = NscfTask 384 task = self.register_task(*args, **kwargs) 385 386 # Make sure parent producing DEN file is given 387 if task.is_work: task = task[-1] 388 den_parent = task.find_parent_with_ext("DEN") 389 if den_parent is None: 390 raise ValueError("NSCF task %s\nrequires parent producing DEN file!" % repr(task)) 391 392 if task.input.get("usekden", 0) == 1: 393 # Meta-GGA calculation --> Add KDEN if not explicitly given. 394 # Assuming prtkden already set to 1 395 # TODO: Abinit should automatically set it to 1 if usekden --> I'm not gonna fix the input at this level 396 kden_parent = task.find_parent_with_ext("KDEN") 397 if kden_parent is None: 398 task.add_deps({den_parent: "KDEN"}) 399 400 return task 401 402 def register_relax_task(self, *args, **kwargs): 403 """Register a task for structural optimization.""" 404 kwargs["task_class"] = RelaxTask 405 return self.register_task(*args, **kwargs) 406 407 def register_phonon_task(self, *args, **kwargs): 408 """Register a phonon task.""" 409 kwargs["task_class"] = PhononTask 410 return self.register_task(*args, **kwargs) 411 412 def register_elastic_task(self, *args, **kwargs): 413 """Register an elastic task.""" 414 kwargs["task_class"] = ElasticTask 415 return self.register_task(*args, **kwargs) 416 417 def register_ddk_task(self, *args, **kwargs): 418 """Register a DDK task.""" 419 kwargs["task_class"] = DdkTask 420 return self.register_task(*args, **kwargs) 421 422 def register_effmass_task(self, *args, **kwargs): 423 """Register a effective mass task.""" 424 kwargs["task_class"] = EffMassTask 425 # FIXME: Hack to run it in sequential because effmass task does not support parallelism. 426 kwargs.update({"manager": TaskManager.from_user_config().new_with_fixed_mpi_omp(1, 1)}) 427 return self.register_task(*args, **kwargs) 428 429 def register_scr_task(self, *args, **kwargs): 430 """Register a screening task.""" 431 kwargs["task_class"] = ScrTask 432 return self.register_task(*args, **kwargs) 433 434 def register_sigma_task(self, *args, **kwargs): 435 """Register a sigma task.""" 436 kwargs["task_class"] = SigmaTask 437 return self.register_task(*args, **kwargs) 438 439 def register_dde_task(self, *args, **kwargs): 440 """Register a Dde task.""" 441 kwargs["task_class"] = DdeTask 442 return self.register_task(*args, **kwargs) 443 444 def register_dte_task(self, *args, **kwargs): 445 """Register a Dte task.""" 446 kwargs["task_class"] = DteTask 447 return self.register_task(*args, **kwargs) 448 449 def register_bec_task(self, *args, **kwargs): 450 """Register a BEC task.""" 451 kwargs["task_class"] = BecTask 452 return self.register_task(*args, **kwargs) 453 454 def register_bse_task(self, *args, **kwargs): 455 """Register a Bethe-Salpeter task.""" 456 kwargs["task_class"] = BseTask 457 return self.register_task(*args, **kwargs) 458 459 def register_eph_task(self, *args, **kwargs): 460 """Register an electron-phonon task.""" 461 kwargs["task_class"] = EphTask 462 eph_inp = args[0] 463 seq_manager = TaskManager.from_user_config().new_with_fixed_mpi_omp(1, 1) 464 if eph_inp.get("eph_frohlichm", 0) != 0 or abs(eph_inp.get("eph_task", 0)) == 15: 465 # FIXME: Hack to run task in sequential if calculation does not support MPI with nprocs > 1. 466 kwargs.update({"manager": seq_manager}) 467 468 return self.register_task(*args, **kwargs) 469 470 def walknset_vars(self, task_class=None, *args, **kwargs): 471 """ 472 Set the values of the ABINIT variables in the input files of the nodes 473 474 Args: 475 task_class: If not None, only the input files of the tasks belonging 476 to class `task_class` are modified. 477 478 Example: 479 480 flow.walknset_vars(ecut=10, kptopt=4) 481 """ 482 def change_task(task): 483 if task_class is not None and task.__class__ is not task_class: return False 484 return True 485 486 if self.is_work: 487 for task in self: 488 if not change_task(task): continue 489 task.set_vars(*args, **kwargs) 490 491 elif self.is_flow: 492 for task in self.iflat_tasks(): 493 if not change_task(task): continue 494 task.set_vars(*args, **kwargs) 495 496 else: 497 raise TypeError("Don't know how to set variables for object class %s" % self.__class__.__name__) 498 499 500class Work(BaseWork, NodeContainer): 501 """ 502 A Work is a list of (possibly connected) tasks. 503 504 .. rubric:: Inheritance Diagram 505 .. inheritance-diagram:: Work 506 """ 507 def __init__(self, workdir=None, manager=None): 508 """ 509 Args: 510 workdir: Path to the working directory. 511 manager: |TaskManager| object. 512 """ 513 super().__init__() 514 515 self._tasks = [] 516 517 if workdir is not None: 518 self.set_workdir(workdir) 519 520 if manager is not None: 521 self.set_manager(manager) 522 523 def set_manager(self, manager): 524 """Set the |TaskManager| to use to launch the |Task|.""" 525 self.manager = manager.deepcopy() 526 for task in self: 527 task.set_manager(manager) 528 529 @property 530 def flow(self): 531 """The flow containing this |Work|.""" 532 return self._flow 533 534 def set_flow(self, flow): 535 """Set the flow associated to this |Work|.""" 536 if not hasattr(self, "_flow"): 537 self._flow = flow 538 else: 539 if self._flow != flow: 540 raise ValueError("self._flow != flow") 541 542 @lazy_property 543 def pos(self): 544 """The position of self in the |Flow|""" 545 for i, work in enumerate(self.flow): 546 if self == work: 547 return i 548 raise ValueError("Cannot find the position of %s in flow %s" % (self, self.flow)) 549 550 @property 551 def pos_str(self): 552 """String representation of self.pos""" 553 return "w" + str(self.pos) 554 555 def set_workdir(self, workdir, chroot=False): 556 """Set the working directory. Cannot be set more than once unless chroot is True""" 557 if not chroot and hasattr(self, "workdir") and self.workdir != workdir: 558 raise ValueError("self.workdir != workdir: %s, %s" % (self.workdir, workdir)) 559 560 self.workdir = os.path.abspath(workdir) 561 562 # Directories with (input|output|temporary) data. 563 # The work will use these directories to connect 564 # itself to other works and/or to produce new data 565 # that will be used by its children. 566 self.indir = Directory(os.path.join(self.workdir, "indata")) 567 self.outdir = Directory(os.path.join(self.workdir, "outdata")) 568 self.tmpdir = Directory(os.path.join(self.workdir, "tmpdata")) 569 self.wdir = Directory(self.workdir) 570 571 def chroot(self, new_workdir): 572 self.set_workdir(new_workdir, chroot=True) 573 574 for i, task in enumerate(self): 575 new_tdir = os.path.join(self.workdir, "t" + str(i)) 576 task.set_workdir(new_tdir, chroot=True) 577 578 def __len__(self): 579 return len(self._tasks) 580 581 def __iter__(self): 582 return self._tasks.__iter__() 583 584 def __getitem__(self, slice): 585 return self._tasks[slice] 586 587 def chunks(self, chunk_size): 588 """Yield successive chunks of tasks of lenght chunk_size.""" 589 for tasks in chunks(self, chunk_size): 590 yield tasks 591 592 def ipath_from_ext(self, ext): 593 """ 594 Returns the path of the output file with extension ext. 595 Use it when the file does not exist yet. 596 """ 597 return self.indir.path_in("in_" + ext) 598 599 def opath_from_ext(self, ext): 600 """ 601 Returns the path of the output file with extension ext. 602 Use it when the file does not exist yet. 603 """ 604 return self.outdir.path_in("out_" + ext) 605 606 @property 607 def processes(self): 608 return [task.process for task in self] 609 610 @property 611 def all_done(self): 612 """True if all the |Task| objects in the |Work| are done.""" 613 return all(task.status >= task.S_DONE for task in self) 614 615 @property 616 def isnc(self): 617 """True if norm-conserving calculation.""" 618 return all(task.isnc for task in self) 619 620 @property 621 def ispaw(self): 622 """True if PAW calculation.""" 623 return all(task.ispaw for task in self) 624 625 @property 626 def status_counter(self): 627 """ 628 Returns a `Counter` object that counts the number of task with 629 given status (use the string representation of the status as key). 630 """ 631 counter = collections.Counter() 632 633 for task in self: 634 counter[str(task.status)] += 1 635 636 return counter 637 638 def allocate(self, manager=None): 639 """ 640 This function is called once we have completed the initialization 641 of the |Work|. It sets the manager of each task (if not already done) 642 and defines the working directories of the tasks. 643 644 Args: 645 manager: |TaskManager| object or None 646 """ 647 for i, task in enumerate(self): 648 649 if not hasattr(task, "manager"): 650 # Set the manager 651 # Use the one provided in input else the one of the work/flow. 652 if manager is not None: 653 task.set_manager(manager) 654 else: 655 # Look first in work and then in the flow. 656 if hasattr(self, "manager"): 657 task.set_manager(self.manager) 658 else: 659 task.set_manager(self.flow.manager) 660 661 task_workdir = os.path.join(self.workdir, "t" + str(i)) 662 663 if not hasattr(task, "workdir"): 664 task.set_workdir(task_workdir) 665 else: 666 if task.workdir != task_workdir: 667 raise ValueError("task.workdir != task_workdir: %s, %s" % (task.workdir, task_workdir)) 668 669 def register(self, obj, deps=None, required_files=None, manager=None, task_class=None): 670 """ 671 Registers a new |Task| and add it to the internal list, taking into account possible dependencies. 672 673 Args: 674 obj: |AbinitInput| instance or |Task| object. 675 deps: Dictionary specifying the dependency of this node or list of dependencies 676 None means that this obj has no dependency. 677 required_files: List of strings with the path of the files used by the task. 678 Note that the files must exist when the task is registered. 679 Use the standard approach based on Works, Tasks and deps 680 if the files will be produced in the future. 681 manager: 682 The |TaskManager| responsible for the submission of the task. If manager is None, we use 683 the `TaskManager` specified during the creation of the |Work|. 684 task_class: Task subclass to instantiate. Default: :class:`AbinitTask` 685 686 Returns: |Task| object 687 """ 688 task_workdir = None 689 if hasattr(self, "workdir"): 690 task_workdir = os.path.join(self.workdir, "t" + str(len(self))) 691 692 if isinstance(obj, Task): 693 task = obj 694 695 else: 696 # Set the class 697 if task_class is None: 698 task_class = AbinitTask 699 700 task = task_class.from_input(obj, task_workdir, manager) 701 702 self._tasks.append(task) 703 704 # Handle possible dependencies given either as dict or list. 705 if deps is not None: 706 if hasattr(deps, "items"): 707 deps = [Dependency(node, exts) for node, exts in deps.items()] 708 task.add_deps(deps) 709 710 # Handle possible dependencies. 711 if required_files is not None: 712 task.add_required_files(required_files) 713 714 return task 715 716 # Needed by NodeContainer 717 register_task = register 718 719 def path_in_workdir(self, filename): 720 """Create the absolute path of filename in the working directory.""" 721 return os.path.join(self.workdir, filename) 722 723 def setup(self, *args, **kwargs): 724 """ 725 Method called before running the calculations. 726 The default implementation is empty. 727 """ 728 729 def build(self, *args, **kwargs): 730 """Creates the top level directory.""" 731 # Create the directories of the work. 732 self.indir.makedirs() 733 self.outdir.makedirs() 734 self.tmpdir.makedirs() 735 736 # Build dirs and files of each task. 737 for task in self: 738 task.build(*args, **kwargs) 739 740 # Connect signals within the work. 741 self.connect_signals() 742 743 @property 744 def status(self): 745 """ 746 Returns the status of the work i.e. the minimum of the status of the tasks. 747 """ 748 return self.get_all_status(only_min=True) 749 750 def get_all_status(self, only_min=False): 751 """ 752 Returns a list with the status of the tasks in self. 753 754 Args: 755 only_min: If True, the minimum of the status is returned. 756 """ 757 if len(self) == 0: 758 # The work will be created in the future. 759 if only_min: 760 return self.S_INIT 761 else: 762 return [self.S_INIT] 763 764 self.check_status() 765 status_list = [task.status for task in self] 766 767 if only_min: 768 return min(status_list) 769 else: 770 return status_list 771 772 def check_status(self): 773 """Check the status of the tasks.""" 774 # Recompute the status of the tasks 775 # Ignore OK and LOCKED tasks. 776 for task in self: 777 if task.status in (task.S_OK, task.S_LOCKED): continue 778 task.check_status() 779 780 # Take into account possible dependencies. Use a list instead of generators 781 for task in self: 782 if task.status == task.S_LOCKED: continue 783 if task.status < task.S_SUB and all(status == task.S_OK for status in task.deps_status): 784 task.set_status(task.S_READY, "Status set to Ready") 785 786 def rmtree(self, exclude_wildcard=""): 787 """ 788 Remove all files and directories in the working directory 789 790 Args: 791 exclude_wildcard: Optional string with regular expressions separated by `|`. 792 Files matching one of the regular expressions will be preserved. 793 example: exclude_wildard="*.nc|*.txt" preserves all the files 794 whose extension is in ["nc", "txt"]. 795 """ 796 if not exclude_wildcard: 797 shutil.rmtree(self.workdir) 798 799 else: 800 w = WildCard(exclude_wildcard) 801 for dirpath, dirnames, filenames in os.walk(self.workdir): 802 for fname in filenames: 803 path = os.path.join(dirpath, fname) 804 if not w.match(fname): 805 os.remove(path) 806 807 def rm_indatadir(self): 808 """Remove all the indata directories.""" 809 for task in self: 810 task.rm_indatadir() 811 812 def rm_outdatadir(self): 813 """Remove all the indata directories.""" 814 for task in self: 815 task.rm_outatadir() 816 817 def rm_tmpdatadir(self): 818 """Remove all the tmpdata directories.""" 819 for task in self: 820 task.rm_tmpdatadir() 821 822 def move(self, dest, isabspath=False): 823 """ 824 Recursively move self.workdir to another location. This is similar to the Unix "mv" command. 825 The destination path must not already exist. If the destination already exists 826 but is not a directory, it may be overwritten depending on os.rename() semantics. 827 828 Be default, dest is located in the parent directory of self.workdir, use isabspath=True 829 to specify an absolute path. 830 """ 831 if not isabspath: 832 dest = os.path.join(os.path.dirname(self.workdir), dest) 833 834 shutil.move(self.workdir, dest) 835 836 def submit_tasks(self, wait=False): 837 """ 838 Submits the task in self and wait. 839 TODO: change name. 840 """ 841 for task in self: 842 task.start() 843 844 if wait: 845 for task in self: task.wait() 846 847 def start(self, *args, **kwargs): 848 """ 849 Start the work. Calls build and _setup first, then submit the tasks. 850 Non-blocking call unless wait is set to True 851 """ 852 wait = kwargs.pop("wait", False) 853 854 # Initial setup 855 self._setup(*args, **kwargs) 856 857 # Build dirs and files. 858 self.build(*args, **kwargs) 859 860 # Submit tasks (does not block) 861 self.submit_tasks(wait=wait) 862 863 def read_etotals(self, unit="Ha"): 864 """ 865 Reads the total energy from the GSR file produced by the task. 866 867 Return a numpy array with the total energies in Hartree 868 The array element is set to np.inf if an exception is raised while reading the GSR file. 869 """ 870 if not self.all_done: 871 raise self.Error("Some task is still in running/submitted state") 872 873 etotals = [] 874 for task in self: 875 # Open the GSR file and read etotal (Hartree) 876 gsr_path = task.outdir.has_abiext("GSR") 877 etot = np.inf 878 if gsr_path: 879 with ETSF_Reader(gsr_path) as r: 880 etot = r.read_value("etotal") 881 882 etotals.append(etot) 883 884 return EnergyArray(etotals, "Ha").to(unit) 885 886 def parse_timers(self): 887 """ 888 Parse the TIMER section reported in the ABINIT output files. 889 890 Returns: 891 :class:`AbinitTimerParser` object 892 """ 893 filenames = list(filter(os.path.exists, [task.output_file.path for task in self])) 894 895 parser = AbinitTimerParser() 896 parser.parse(filenames) 897 898 return parser 899 900 901class BandStructureWork(Work): 902 """ 903 Work for band structure calculations. 904 905 .. rubric:: Inheritance Diagram 906 .. inheritance-diagram:: BandStructureWork 907 """ 908 909 def __init__(self, scf_input, nscf_input, dos_inputs=None, workdir=None, manager=None): 910 """ 911 Args: 912 scf_input: Input for the SCF run 913 nscf_input: Input for the NSCF run defining the band structure calculation. 914 dos_inputs: Input(s) for the DOS. DOS is computed only if dos_inputs is not None. 915 workdir: Working directory. 916 manager: |TaskManager| object. 917 """ 918 super().__init__(workdir=workdir, manager=manager) 919 920 # Register the GS-SCF run. 921 self.scf_task = self.register_scf_task(scf_input) 922 923 # Register the NSCF run and its dependency. 924 self.nscf_task = self.register_nscf_task(nscf_input, deps={self.scf_task: "DEN"}) 925 926 # Add DOS computation(s) if requested. 927 self.dos_tasks = [] 928 if dos_inputs is not None: 929 if not isinstance(dos_inputs, (list, tuple)): 930 dos_inputs = [dos_inputs] 931 932 for dos_input in dos_inputs: 933 dos_task = self.register_nscf_task(dos_input, deps={self.scf_task: "DEN"}) 934 self.dos_tasks.append(dos_task) 935 936 def plot_ebands(self, **kwargs): 937 """ 938 Plot the band structure. kwargs are passed to the plot method of |ElectronBands|. 939 940 Return: |matplotlib-Figure| 941 """ 942 with self.nscf_task.open_gsr() as gsr: 943 return gsr.ebands.plot(**kwargs) 944 945 def plot_ebands_with_edos(self, dos_pos=0, method="gaussian", step=0.01, width=0.1, **kwargs): 946 """ 947 Plot the band structure and the DOS. 948 949 Args: 950 dos_pos: Index of the task from which the DOS should be obtained (note: 0 refers to the first DOS task). 951 method: String defining the method for the computation of the DOS. 952 step: Energy step (eV) of the linear mesh. 953 width: Standard deviation (eV) of the gaussian. 954 kwargs: Keyword arguments passed to `plot_with_edos` method to customize the plot. 955 956 Return: |matplotlib-Figure| 957 """ 958 with self.nscf_task.open_gsr() as gsr: 959 gs_ebands = gsr.ebands 960 961 with self.dos_tasks[dos_pos].open_gsr() as gsr: 962 dos_ebands = gsr.ebands 963 964 edos = dos_ebands.get_edos(method=method, step=step, width=width) 965 return gs_ebands.plot_with_edos(edos, **kwargs) 966 967 def plot_edoses(self, dos_pos=None, method="gaussian", step=0.01, width=0.1, **kwargs): 968 """ 969 Plot the band structure and the DOS. 970 971 Args: 972 dos_pos: Index of the task from which the DOS should be obtained. 973 None is all DOSes should be displayed. Accepts integer or list of integers. 974 method: String defining the method for the computation of the DOS. 975 step: Energy step (eV) of the linear mesh. 976 width: Standard deviation (eV) of the gaussian. 977 kwargs: Keyword arguments passed to `plot` method to customize the plot. 978 979 Return: |matplotlib-Figure| 980 """ 981 if dos_pos is not None and not isinstance(dos_pos, (list, tuple)): dos_pos = [dos_pos] 982 983 from abipy.electrons.ebands import ElectronDosPlotter 984 plotter = ElectronDosPlotter() 985 for i, task in enumerate(self.dos_tasks): 986 if dos_pos is not None and i not in dos_pos: continue 987 with task.open_gsr() as gsr: 988 edos = gsr.ebands.get_edos(method=method, step=step, width=width) 989 ngkpt = task.get_inpvar("ngkpt") 990 plotter.add_edos("ngkpt %s" % str(ngkpt), edos) 991 992 return plotter.combiplot(**kwargs) 993 994 995class RelaxWork(Work): 996 """ 997 Work for structural relaxations. The first task relaxes the atomic position 998 while keeping the unit cell parameters fixed. The second task uses the final 999 structure to perform a structural relaxation in which both the atomic positions 1000 and the lattice parameters are optimized. 1001 1002 .. rubric:: Inheritance Diagram 1003 .. inheritance-diagram:: RelaxWork 1004 """ 1005 def __init__(self, ion_input, ioncell_input, workdir=None, manager=None, target_dilatmx=None): 1006 """ 1007 Args: 1008 ion_input: Input for the relaxation of the ions (cell is fixed) 1009 ioncell_input: Input for the relaxation of the ions and the unit cell. 1010 workdir: Working directory. 1011 manager: |TaskManager| object. 1012 """ 1013 super().__init__(workdir=workdir, manager=manager) 1014 1015 self.ion_task = self.register_relax_task(ion_input) 1016 1017 # Note: 1018 # 1) It would be nice to restart from the WFK file but ABINIT crashes due to the 1019 # different unit cell parameters if paral_kgb == 1 1020 #paral_kgb = ion_input[0]["paral_kgb"] 1021 #if paral_kgb == 1: 1022 1023 #deps = {self.ion_task: "WFK"} # --> FIXME: Problem in rwwf 1024 #deps = {self.ion_task: "DEN"} 1025 deps = None 1026 1027 self.ioncell_task = self.register_relax_task(ioncell_input, deps=deps) 1028 1029 # Lock ioncell_task as ion_task should communicate to ioncell_task that 1030 # the calculation is OK and pass the final structure. 1031 self.ioncell_task.lock(source_node=self) 1032 self.transfer_done = False 1033 1034 self.target_dilatmx = target_dilatmx 1035 1036 #@check_spectator 1037 def on_ok(self, sender): 1038 """ 1039 This callback is called when one task reaches status S_OK. 1040 If sender == self.ion_task, we update the initial structure 1041 used by self.ioncell_task and we unlock it so that the job can be submitted. 1042 """ 1043 self.history.debug("In on_ok with sender %s" % sender) 1044 1045 if sender == self.ion_task and not self.transfer_done: 1046 # Get the relaxed structure from ion_task 1047 ion_structure = self.ion_task.get_final_structure() 1048 1049 # Transfer it to the ioncell task (we do it only once). 1050 self.ioncell_task._change_structure(ion_structure) 1051 self.transfer_done = True 1052 1053 # Unlock ioncell_task so that we can submit it. 1054 self.ioncell_task.unlock(source_node=self) 1055 1056 elif sender == self.ioncell_task and self.target_dilatmx: 1057 actual_dilatmx = self.ioncell_task.get_inpvar('dilatmx', 1.) 1058 if self.target_dilatmx < actual_dilatmx: 1059 self.ioncell_task.reduce_dilatmx(target=self.target_dilatmx) 1060 self.history.info('Converging dilatmx. Value reduce from {} to {}.' 1061 .format(actual_dilatmx, self.ioncell_task.get_inpvar('dilatmx'))) 1062 self.ioncell_task.restart() 1063 1064 return super().on_ok(sender) 1065 1066 def plot_ion_relaxation(self, **kwargs): 1067 """ 1068 Plot the history of the ion-cell relaxation. 1069 kwargs are passed to the plot method of |HistFile| 1070 1071 Return: |matplotlib-Figure| or None if hist file is not found. 1072 """ 1073 with self.ion_task.open_hist() as hist: 1074 return hist.plot(**kwargs) if hist else None 1075 1076 def plot_ioncell_relaxation(self, **kwargs): 1077 """ 1078 Plot the history of the ion-cell relaxation. 1079 kwargs are passed to the plot method of |HistFile| 1080 1081 Return: |matplotlib-Figure| or None if hist file is not found. 1082 """ 1083 with self.ioncell_task.open_hist() as hist: 1084 return hist.plot(**kwargs) if hist else None 1085 1086 1087class G0W0Work(Work): 1088 """ 1089 Work for general G0W0 calculations. 1090 All input can be either single inputs or lists of inputs 1091 1092 .. rubric:: Inheritance Diagram 1093 .. inheritance-diagram:: G0W0Work 1094 """ 1095 def __init__(self, scf_inputs, nscf_inputs, scr_inputs, sigma_inputs, 1096 workdir=None, manager=None): 1097 """ 1098 Args: 1099 scf_inputs: Input(s) for the SCF run, if it is a list add all but only link 1100 to the last input (used for convergence studies on the KS band gap) 1101 nscf_inputs: Input(s) for the NSCF run, if it is a list add all but only 1102 link to the last (i.e. addditiona DOS and BANDS) 1103 scr_inputs: Input for the screening run 1104 sigma_inputs: List of |AbinitInput| for the self-energy run. 1105 if scr and sigma are lists of the same length, every sigma gets its own screening. 1106 if there is only one screening all sigma inputs are linked to this one 1107 workdir: Working directory of the calculation. 1108 manager: |TaskManager| object. 1109 """ 1110 super().__init__(workdir=workdir, manager=manager) 1111 1112 spread_scr = (isinstance(sigma_inputs, (list, tuple)) and 1113 isinstance(scr_inputs, (list, tuple)) and 1114 len(sigma_inputs) == len(scr_inputs)) 1115 #print("spread_scr", spread_scr) 1116 1117 self.sigma_tasks = [] 1118 1119 # Register the GS-SCF run. 1120 # register all scf_inputs but link the nscf only the last scf in the list 1121 # multiple scf_inputs can be provided to perform convergence studies 1122 if isinstance(scf_inputs, (list, tuple)): 1123 for scf_input in scf_inputs: 1124 self.scf_task = self.register_scf_task(scf_input) 1125 else: 1126 self.scf_task = self.register_scf_task(scf_inputs) 1127 1128 # Register the NSCF run (s). 1129 if isinstance(nscf_inputs, (list, tuple)): 1130 for nscf_input in nscf_inputs: 1131 self.nscf_task = nscf_task = self.register_nscf_task(nscf_input, deps={self.scf_task: "DEN"}) 1132 else: 1133 self.nscf_task = nscf_task = self.register_nscf_task(nscf_inputs, deps={self.scf_task: "DEN"}) 1134 1135 # Register the SCR and SIGMA run(s). 1136 if spread_scr: 1137 for scr_input, sigma_input in zip(scr_inputs, sigma_inputs): 1138 scr_task = self.register_scr_task(scr_input, deps={nscf_task: "WFK"}) 1139 sigma_task = self.register_sigma_task(sigma_input, deps={nscf_task: "WFK", scr_task: "SCR"}) 1140 self.sigma_tasks.append(sigma_task) 1141 else: 1142 # Sigma work(s) connected to the same screening. 1143 scr_task = self.register_scr_task(scr_inputs, deps={nscf_task: "WFK"}) 1144 if isinstance(sigma_inputs, (list, tuple)): 1145 for inp in sigma_inputs: 1146 task = self.register_sigma_task(inp, deps={nscf_task: "WFK", scr_task: "SCR"}) 1147 self.sigma_tasks.append(task) 1148 else: 1149 task = self.register_sigma_task(sigma_inputs, deps={nscf_task: "WFK", scr_task: "SCR"}) 1150 self.sigma_tasks.append(task) 1151 1152 1153class SigmaConvWork(Work): 1154 """ 1155 Work for self-energy convergence studies. 1156 1157 .. rubric:: Inheritance Diagram 1158 .. inheritance-diagram:: SigmaConvWork 1159 """ 1160 def __init__(self, wfk_node, scr_node, sigma_inputs, workdir=None, manager=None): 1161 """ 1162 Args: 1163 wfk_node: The node who has produced the WFK file or filepath pointing to the WFK file. 1164 scr_node: The node who has produced the SCR file or filepath pointing to the SCR file. 1165 sigma_inputs: List of |AbinitInput| for the self-energy runs. 1166 workdir: Working directory of the calculation. 1167 manager: |TaskManager| object. 1168 """ 1169 # Cast to node instances. 1170 wfk_node, scr_node = Node.as_node(wfk_node), Node.as_node(scr_node) 1171 1172 super().__init__(workdir=workdir, manager=manager) 1173 1174 # Register the SIGMA runs. 1175 if not isinstance(sigma_inputs, (list, tuple)): 1176 sigma_inputs = [sigma_inputs] 1177 1178 for sigma_input in sigma_inputs: 1179 self.register_sigma_task(sigma_input, deps={wfk_node: "WFK", scr_node: "SCR"}) 1180 1181 1182class BseMdfWork(Work): 1183 """ 1184 Work for simple BSE calculations in which the self-energy corrections 1185 are approximated by the scissors operator and the screening is modeled 1186 with the model dielectric function. 1187 1188 .. rubric:: Inheritance Diagram 1189 .. inheritance-diagram:: BseMdfWork 1190 """ 1191 def __init__(self, scf_input, nscf_input, bse_inputs, workdir=None, manager=None): 1192 """ 1193 Args: 1194 scf_input: Input for the SCF run. 1195 nscf_input: Input for the NSCF run. 1196 bse_inputs: List of Inputs for the BSE run. 1197 workdir: Working directory of the calculation. 1198 manager: |TaskManager| 1199 """ 1200 super().__init__(workdir=workdir, manager=manager) 1201 1202 # Register the GS-SCF run. 1203 self.scf_task = self.register_scf_task(scf_input) 1204 1205 # Construct the input for the NSCF run. 1206 self.nscf_task = self.register_nscf_task(nscf_input, deps={self.scf_task: "DEN"}) 1207 1208 # Construct the input(s) for the BSE run. 1209 if not isinstance(bse_inputs, (list, tuple)): 1210 bse_inputs = [bse_inputs] 1211 1212 for bse_input in bse_inputs: 1213 self.register_bse_task(bse_input, deps={self.nscf_task: "WFK"}) 1214 1215 def get_mdf_robot(self): 1216 """Builds and returns a :class:`MdfRobot` for analyzing the results in the MDF files.""" 1217 from abilab.robots import MdfRobot 1218 robot = MdfRobot() 1219 for task in self[2:]: 1220 mdf_path = task.outdir.has_abiext(robot.EXT) 1221 if mdf_path: 1222 robot.add_file(str(task), mdf_path) 1223 return robot 1224 1225 1226class QptdmWork(Work): 1227 """ 1228 This work parallelizes the calculation of the q-points of the screening. 1229 It also provides the callback `on_all_ok` that calls mrgscr to merge 1230 all the partial screening files produced. 1231 1232 .. rubric:: Inheritance Diagram 1233 .. inheritance-diagram:: QptdmWork 1234 """ 1235 def create_tasks(self, wfk_file, scr_input): 1236 """ 1237 Create the SCR tasks and register them in self. 1238 1239 Args: 1240 wfk_file: Path to the ABINIT WFK file to use for the computation of the screening. 1241 scr_input: Input for the screening calculation. 1242 """ 1243 assert len(self) == 0 1244 wfk_file = self.wfk_file = os.path.abspath(wfk_file) 1245 1246 # Build a temporary work in the tmpdir that will use a shell manager 1247 # to run ABINIT in order to get the list of q-points for the screening. 1248 shell_manager = self.manager.to_shell_manager(mpi_procs=1) 1249 1250 w = Work(workdir=self.tmpdir.path_join("_qptdm_run"), manager=shell_manager) 1251 1252 fake_input = scr_input.deepcopy() 1253 fake_task = w.register(fake_input) 1254 w.allocate() 1255 w.build() 1256 1257 # Create the symbolic link and add the magic value 1258 # nqpdm = -1 to the input to get the list of q-points. 1259 fake_task.inlink_file(wfk_file) 1260 fake_task.set_vars({"nqptdm": -1}) 1261 fake_task.start_and_wait() 1262 1263 # Parse the section with the q-points 1264 with NetcdfReader(fake_task.outdir.has_abiext("qptdms.nc")) as reader: 1265 qpoints = reader.read_value("reduced_coordinates_of_kpoints") 1266 1267 # Now we can register the task for the different q-points 1268 for qpoint in qpoints: 1269 qptdm_input = scr_input.deepcopy() 1270 qptdm_input.set_vars(nqptdm=1, qptdm=qpoint) 1271 new_task = self.register_scr_task(qptdm_input, manager=self.manager) 1272 # Add the garbage collector. 1273 if self.flow.gc is not None: 1274 new_task.set_gc(self.flow.gc) 1275 1276 self.allocate() 1277 1278 def merge_scrfiles(self, remove_scrfiles=True): 1279 """ 1280 This method is called when all the q-points have been computed. 1281 It runs `mrgscr` in sequential on the local machine to produce 1282 the final SCR file in the outdir of the `Work`. 1283 If remove_scrfiles is True, the partial SCR files are removed after the merge. 1284 """ 1285 scr_files = list(filter(None, [task.outdir.has_abiext("SCR") for task in self])) 1286 1287 self.history.info("Will call mrgscr to merge %s SCR files:\n" % len(scr_files)) 1288 assert len(scr_files) == len(self) 1289 1290 mrgscr = wrappers.Mrgscr(manager=self[0].manager, verbose=1) 1291 final_scr = mrgscr.merge_qpoints(self.outdir.path, scr_files, out_prefix="out") 1292 1293 if remove_scrfiles: 1294 for scr_file in scr_files: 1295 try: 1296 os.remove(scr_file) 1297 except IOError: 1298 pass 1299 1300 return final_scr 1301 1302 #@check_spectator 1303 def on_all_ok(self): 1304 """ 1305 This method is called when all the q-points have been computed. 1306 It runs `mrgscr` in sequential on the local machine to produce 1307 the final SCR file in the outdir of the `Work`. 1308 """ 1309 final_scr = self.merge_scrfiles() 1310 return self.Results(node=self, returncode=0, message="mrgscr done", final_scr=final_scr) 1311 1312# TODO: MergeDdb --> DfptWork(Work) postpone it because it may break pickle. 1313 1314 1315class MergeDdb(object): 1316 """ 1317 Mixin class for Works that have to merge the DDB files produced by the tasks. 1318 """ 1319 1320 def add_becs_from_scf_task(self, scf_task, ddk_tolerance, ph_tolerance): 1321 """ 1322 Build tasks for the computation of Born effective charges and add them to the work. 1323 1324 Args: 1325 scf_task: |ScfTask| object. 1326 ddk_tolerance: dict {"varname": value} with the tolerance used in the DDK run. None to use AbiPy default. 1327 ph_tolerance: dict {"varname": value} with the tolerance used in the phonon run. None to use AbiPy default. 1328 1329 Return: (ddk_tasks, bec_tasks) 1330 """ 1331 if not isinstance(scf_task, ScfTask): 1332 raise TypeError("task `%s` does not inherit from ScfTask" % scf_task) 1333 1334 # DDK calculations (self-consistent to get electric field). 1335 multi_ddk = scf_task.input.make_ddk_inputs(tolerance=ddk_tolerance) 1336 1337 ddk_tasks = [] 1338 for ddk_inp in multi_ddk: 1339 ddk_task = self.register_ddk_task(ddk_inp, deps={scf_task: "WFK"}) 1340 ddk_tasks.append(ddk_task) 1341 1342 # Build the list of inputs for electric field perturbation and phonons 1343 # Each BEC task is connected to all the previous DDK task and to the scf_task. 1344 bec_deps = {ddk_task: "DDK" for ddk_task in ddk_tasks} 1345 bec_deps.update({scf_task: "WFK"}) 1346 1347 bec_inputs = scf_task.input.make_bec_inputs(tolerance=ph_tolerance) 1348 bec_tasks = [] 1349 for bec_inp in bec_inputs: 1350 bec_task = self.register_bec_task(bec_inp, deps=bec_deps) 1351 bec_tasks.append(bec_task) 1352 1353 return ddk_tasks, bec_tasks 1354 1355 def merge_ddb_files(self, delete_source_ddbs=False, only_dfpt_tasks=True, 1356 exclude_tasks=None, include_tasks=None): 1357 """ 1358 This method is called when all the q-points have been computed. 1359 It runs `mrgddb` in sequential on the local machine to produce 1360 the final DDB file in the outdir of the `Work`. 1361 1362 Args: 1363 delete_source_ddbs: True if input DDB should be removed once final DDB is created. 1364 only_dfpt_tasks: False to merge all DDB files produced by the tasks of the work 1365 Useful e.g. for finite stress corrections in which the stress in the 1366 initial configuration should be merged in the final DDB. 1367 exclude_tasks: List of tasks that should be excluded when merging the partial DDB files. 1368 include_tasks: List of tasks that should be included when merging the partial DDB files. 1369 Mutually exclusive with exclude_tasks. 1370 1371 Returns: 1372 path to the output DDB file 1373 """ 1374 if exclude_tasks: 1375 my_tasks = [task for task in self if task not in exclude_tasks] 1376 elif include_tasks: 1377 my_tasks = [task for task in self if task in include_tasks] 1378 else: 1379 my_tasks = [task for task in self] 1380 1381 if only_dfpt_tasks: 1382 ddb_files = list(filter(None, [task.outdir.has_abiext("DDB") for task in my_tasks 1383 if isinstance(task, DfptTask)])) 1384 else: 1385 ddb_files = list(filter(None, [task.outdir.has_abiext("DDB") for task in my_tasks])) 1386 1387 self.history.info("Will call mrgddb to merge %s DDB files:" % len(ddb_files)) 1388 # DDB files are always produces so this should never happen! 1389 if not ddb_files: 1390 raise RuntimeError("Cannot find any DDB file to merge by the task of " % self) 1391 1392 # Final DDB file will be produced in the outdir of the work. 1393 out_ddb = self.outdir.path_in("out_DDB") 1394 1395 if len(ddb_files) == 1: 1396 # Avoid the merge. Just copy the DDB file to the outdir of the work. 1397 shutil.copy(ddb_files[0], out_ddb) 1398 else: 1399 # Call mrgddb 1400 desc = "DDB file merged by %s on %s" % (self.__class__.__name__, time.asctime()) 1401 mrgddb = wrappers.Mrgddb(manager=self[0].manager, verbose=0) 1402 mrgddb.merge(self.outdir.path, ddb_files, out_ddb=out_ddb, description=desc, 1403 delete_source_ddbs=delete_source_ddbs) 1404 1405 return out_ddb 1406 1407 def merge_pot1_files(self, delete_source=False): 1408 """ 1409 This method is called when all the q-points have been computed. 1410 It runs `mrgdvdb` in sequential on the local machine to produce 1411 the final DVDB file in the outdir of the `Work`. 1412 1413 Args: 1414 delete_source: True if POT1 files should be removed after (successful) merge. 1415 1416 Returns: path to the output DVDB file. None if not DFPT POT file is found. 1417 """ 1418 natom = len(self[0].input.structure) 1419 max_pertcase = 3 * natom 1420 1421 pot1_files = [] 1422 for task in self: 1423 if not isinstance(task, DfptTask): continue 1424 paths = task.outdir.list_filepaths(wildcard="*_POT*") 1425 for path in paths: 1426 # Include only atomic perturbations i.e. files whose ext <= 3 * natom 1427 i = path.rindex("_POT") 1428 pertcase = int(path[i+4:].replace(".nc", "")) 1429 if pertcase <= max_pertcase: 1430 pot1_files.append(path) 1431 1432 # prtpot = 0 disables the output of the DFPT POT files so an empty list is not fatal here. 1433 if not pot1_files: return None 1434 1435 self.history.info("Will call mrgdvdb to merge %s files:" % len(pot1_files)) 1436 1437 # Final DDB file will be produced in the outdir of the work. 1438 out_dvdb = self.outdir.path_in("out_DVDB") 1439 1440 if len(pot1_files) == 1: 1441 # Avoid the merge. Just move the DDB file to the outdir of the work 1442 shutil.copy(pot1_files[0], out_dvdb) 1443 else: 1444 # FIXME: The merge may require a non-negligible amount of memory if lots of qpts. 1445 # Besides there are machines such as lemaitre3 that are problematic when 1446 # running MPI applications on the front-end 1447 mrgdvdb = wrappers.Mrgdvdb(manager=self[0].manager, verbose=0) 1448 mrgdvdb.merge(self.outdir.path, pot1_files, out_dvdb, delete_source=delete_source) 1449 1450 return out_dvdb 1451 1452 1453class PhononWork(Work, MergeDdb): 1454 """ 1455 This work consists of nirred Phonon tasks where nirred is 1456 the number of irreducible atomic perturbations for a given set of q-points. 1457 It provides the callback method (on_all_ok) that calls mrgddb (mrgdv) to merge 1458 all the partial DDB (POT) files produced. The two files are available in the 1459 output directory of the Work. 1460 1461 .. rubric:: Inheritance Diagram 1462 .. inheritance-diagram:: PhononWork 1463 """ 1464 1465 @classmethod 1466 def from_scf_task(cls, scf_task, qpoints, is_ngqpt=False, tolerance=None, with_becs=False, 1467 ddk_tolerance=None, manager=None): 1468 """ 1469 Construct a `PhononWork` from a |ScfTask| object. 1470 The input file for phonons is automatically generated from the input of the ScfTask. 1471 Each phonon task depends on the WFK file produced by the `scf_task`. 1472 1473 Args: 1474 scf_task: |ScfTask| object. 1475 qpoints: q-points in reduced coordinates. Accepts single q-point, list of q-points 1476 or three integers defining the q-mesh if `is_ngqpt`. 1477 is_ngqpt: True if `qpoints` should be interpreted as divisions instead of q-points. 1478 tolerance: dict {"varname": value} with the tolerance to be used in the phonon run. 1479 None to use AbiPy default. 1480 with_becs: Activate calculation of Electric field and Born effective charges. 1481 ddk_tolerance: dict {"varname": value} with the tolerance used in the DDK run if with_becs. 1482 None to use AbiPy default. 1483 manager: |TaskManager| object. 1484 """ 1485 if not isinstance(scf_task, ScfTask): 1486 raise TypeError("task `%s` does not inherit from ScfTask" % scf_task) 1487 1488 if is_ngqpt: 1489 qpoints = scf_task.input.abiget_ibz(ngkpt=qpoints, shiftk=[0, 0, 0], kptopt=1).points 1490 qpoints = np.reshape(qpoints, (-1, 3)) 1491 1492 new = cls(manager=manager) 1493 if with_becs: 1494 new.add_becs_from_scf_task(scf_task, ddk_tolerance, ph_tolerance=tolerance) 1495 1496 for qpt in qpoints: 1497 if with_becs and np.sum(qpt ** 2) < 1e-12: continue 1498 multi = scf_task.input.make_ph_inputs_qpoint(qpt, tolerance=tolerance) 1499 for ph_inp in multi: 1500 new.register_phonon_task(ph_inp, deps={scf_task: "WFK"}) 1501 1502 return new 1503 1504 @classmethod 1505 def from_scf_input(cls, scf_input, qpoints, is_ngqpt=False, tolerance=None, 1506 with_becs=False, ddk_tolerance=None, manager=None): 1507 """ 1508 Similar to `from_scf_task`, the difference is that this method requires 1509 an input for SCF calculation. A new |ScfTask| is created and added to the Work. 1510 This API should be used if the DDB of the GS task should be merged. 1511 """ 1512 if is_ngqpt: 1513 qpoints = scf_input.abiget_ibz(ngkpt=qpoints, shiftk=[0, 0, 0], kptopt=1).points 1514 1515 qpoints = np.reshape(qpoints, (-1, 3)) 1516 1517 new = cls(manager=manager) 1518 # Create ScfTask 1519 scf_task = new.register_scf_task(scf_input) 1520 1521 if with_becs: 1522 new.add_becs_from_scf_task(scf_task, ddk_tolerance, ph_tolerance=tolerance) 1523 1524 for qpt in qpoints: 1525 if with_becs and np.sum(qpt ** 2) < 1e-12: continue 1526 multi = scf_task.input.make_ph_inputs_qpoint(qpt, tolerance=tolerance) 1527 for ph_inp in multi: 1528 new.register_phonon_task(ph_inp, deps={scf_task: "WFK"}) 1529 1530 return new 1531 1532 #@check_spectator 1533 def on_all_ok(self): 1534 """ 1535 This method is called when all the q-points have been computed. 1536 Ir runs `mrgddb` in sequential on the local machine to produce 1537 the final DDB file in the outdir of the |Work|. 1538 """ 1539 # Merge DDB files. 1540 out_ddb = self.merge_ddb_files() 1541 # Merge DVDB files. 1542 out_dvdb = self.merge_pot1_files() 1543 1544 return self.Results(node=self, returncode=0, message="DDB merge done") 1545 1546 1547class PhononWfkqWork(Work, MergeDdb): 1548 """ 1549 This work computes phonons with DFPT on an arbitrary q-mesh (usually denser than the k-mesh for electrons) 1550 by computing WKQ files for each q-point. 1551 The number of irreducible atomic perturbations for each q-point is taken into account. 1552 It provides the callback method (on_all_ok) that calls mrgddb (mrgdv) to merge 1553 all the partial DDB (POT) files produced. The two files are available in the 1554 output directory of the Work. The WKQ files are removed at runtime. 1555 1556 .. rubric:: Inheritance Diagram 1557 .. inheritance-diagram:: PhononWfkqWork 1558 """ 1559 1560 @classmethod 1561 def from_scf_task(cls, scf_task, ngqpt, ph_tolerance=None, tolwfr=1.0e-22, nband=None, 1562 with_becs=False, ddk_tolerance=None, shiftq=(0, 0, 0), is_ngqpt=True, remove_wfkq=True, 1563 prepgkk=0, manager=None): 1564 """ 1565 Construct a `PhononWfkqWork` from a |ScfTask| object. 1566 The input files for WFQ and phonons are automatically generated from the input of the ScfTask. 1567 Each phonon task depends on the WFK file produced by scf_task and the associated WFQ file. 1568 1569 Args: 1570 scf_task: |ScfTask| object. 1571 ngqpt: three integers defining the q-mesh 1572 with_becs: Activate calculation of Electric field and Born effective charges. 1573 ph_tolerance: dict {"varname": value} with the tolerance for the phonon run. 1574 None to use AbiPy default. 1575 tolwfr: tolerance used to compute WFQ. 1576 ddk_tolerance: dict {"varname": value} with the tolerance used in the DDK run if with_becs. 1577 None to use AbiPy default. 1578 shiftq: Q-mesh shift. Multiple shifts are not supported. 1579 is_ngqpt: the ngqpt is interpreted as a set of integers defining the q-mesh, otherwise 1580 is an explicit list of q-points 1581 remove_wfkq: Remove WKQ files when the children are completed. 1582 prepgkk: 1 to activate computation of all 3*natom perts (debugging option). 1583 manager: |TaskManager| object. 1584 1585 .. note: 1586 1587 Use k-meshes with one shift and q-meshes that are multiple of ngkpt 1588 to decrease the number of WFQ files to be computed. 1589 """ 1590 if not isinstance(scf_task, ScfTask): 1591 raise TypeError("task `%s` does not inherit from ScfTask" % scf_task) 1592 1593 shiftq = np.reshape(shiftq, (3, )) 1594 #print("ngqpt", ngqpt, "\nshiftq", shiftq) 1595 if is_ngqpt: 1596 qpoints = scf_task.input.abiget_ibz(ngkpt=ngqpt, shiftk=shiftq, kptopt=1).points 1597 else: 1598 qpoints = np.reshape(ngqpt, (-1, 3)) 1599 1600 new = cls(manager=manager) 1601 new.remove_wfkq = remove_wfkq 1602 new.phonon_tasks = [] 1603 new.wfkq_tasks = [] 1604 new.wfkq_task_children = collections.defaultdict(list) 1605 1606 if with_becs: 1607 # Add DDK and BECS. 1608 new.add_becs_from_scf_task(scf_task, ddk_tolerance, ph_tolerance) 1609 1610 # Get ngkpt, shift for electrons from input. 1611 # Won't try to skip WFQ if multiple shifts or off-diagonal kptrlatt 1612 ngkpt, shiftk = scf_task.input.get_ngkpt_shiftk() 1613 #try_to_skip_wfkq = True 1614 #if ngkpt is not None and len(shiftk) == 1 and not is_ngqpt: 1615 # try_to_skip_wfkq = True 1616 try_to_skip_wfkq = False 1617 1618 # TODO: One could avoid kptopt 3 by computing WFK in the IBZ and then rotating. 1619 # but this has to be done inside Abinit. 1620 for qpt in qpoints: 1621 is_gamma = np.sum(qpt ** 2) < 1e-12 1622 if with_becs and is_gamma: continue 1623 1624 # Avoid WFQ if k + q = k (requires ngkpt, multiple shifts are not supported) 1625 need_wfkq = True 1626 if is_gamma: 1627 need_wfkq = False 1628 elif try_to_skip_wfkq: 1629 # k = (i + shiftk) / ngkpt 1630 qinds = np.rint(qpt * ngqpt - shiftq) 1631 #print("qpt", qpt, "\nqinds", qinds, "\nngkpt", ngkpt, "\nngqpt", ngqpt) 1632 f = (qinds * ngkpt) % ngqpt 1633 need_wfkq = np.any(f != 0) 1634 1635 #neee_wfkq = True 1636 1637 if need_wfkq: 1638 nscf_inp = scf_task.input.new_with_vars(qpt=qpt, nqpt=1, iscf=-2, kptopt=3, tolwfr=tolwfr) 1639 if nband: 1640 nbdbuf = max(2, nband*0.1) 1641 nscf_inp.set_vars(nband=nband+nbdbuf, nbdbuf=nbdbuf) 1642 wfkq_task = new.register_nscf_task(nscf_inp, deps={scf_task: ["DEN", "WFK"]}) 1643 new.wfkq_tasks.append(wfkq_task) 1644 1645 multi = scf_task.input.make_ph_inputs_qpoint(qpt, tolerance=ph_tolerance, prepgkk=prepgkk) 1646 for ph_inp in multi: 1647 deps = {scf_task: "WFK", wfkq_task: "WFQ"} if need_wfkq else {scf_task: "WFK"} 1648 #ph_inp["prtwf"] = -1 1649 t = new.register_phonon_task(ph_inp, deps=deps) 1650 new.phonon_tasks.append(t) 1651 if need_wfkq: 1652 new.wfkq_task_children[wfkq_task].append(t) 1653 1654 return new 1655 1656 def on_ok(self, sender): 1657 """ 1658 This callback is called when one task reaches status `S_OK`. 1659 It removes the WFKQ file if all its children have reached `S_OK`. 1660 """ 1661 if self.remove_wfkq: 1662 for task in self.wfkq_tasks: 1663 if task.status != task.S_OK: continue 1664 children = self.wfkq_task_children[task] 1665 if all(child.status == child.S_OK for child in children): 1666 path = task.outdir.has_abiext("WFQ") 1667 if path: 1668 self.history.info("Removing WFQ: %s" % path) 1669 os.remove(path) 1670 1671 return super().on_ok(sender) 1672 1673 #@check_spectator 1674 def on_all_ok(self): 1675 """ 1676 This method is called when all the q-points have been computed. 1677 Ir runs `mrgddb` in sequential on the local machine to produce 1678 the final DDB file in the outdir of the |Work|. 1679 """ 1680 # Merge DDB files. 1681 out_ddb = self.merge_ddb_files() 1682 1683 # Merge DVDB files. 1684 out_dvdb = self.merge_pot1_files() 1685 1686 return self.Results(node=self, returncode=0, message="DDB merge done") 1687 1688 1689class GKKPWork(Work): 1690 """ 1691 This work computes electron-phonon matrix elements for all the q-points 1692 present in a DVDB and DDB file 1693 1694 .. rubric:: Inheritance Diagram 1695 .. inheritance-diagram:: GKKPWork 1696 """ 1697 @classmethod 1698 def from_den_ddb_dvdb(cls, inp, den_path, ddb_path, dvdb_path, mpiprocs=1, remove_wfkq=True, 1699 qpath=None, with_ddk=True, expand=True, manager=None): 1700 """ 1701 Construct a `PhononWfkqWork` from a DDB and DVDB file. 1702 For each q found, a WFQ task and an EPH task computing the matrix elements are created. 1703 """ 1704 import abipy.abilab as abilab 1705 1706 # Create file nodes 1707 den_file = FileNode(den_path) 1708 ddb_file = FileNode(ddb_path) 1709 dvdb_file = FileNode(dvdb_path) 1710 1711 # Create new work 1712 new = cls(manager=manager) 1713 new.remove_wfkq = remove_wfkq 1714 new.wfkq_tasks = [] 1715 new.wfkq_task_children = collections.defaultdict(list) 1716 if manager is None: manager = TaskManager.from_user_config() 1717 tm = manager.new_with_fixed_mpi_omp(mpiprocs, 1) 1718 1719 # Create a WFK task 1720 kptopt = 1 if expand else 3 1721 nscf_inp = inp.new_with_vars(iscf=-2, kptopt=kptopt) 1722 wfk_task = new.register_nscf_task(nscf_inp, deps={den_file: "DEN"},manager=tm) 1723 new.wfkq_tasks.append(wfk_task) 1724 new.wfk_task = wfk_task 1725 1726 # Read path and regular grid from DDB file 1727 with abilab.abiopen(ddb_path) as ddb: 1728 q_frac_coords = np.array([k.frac_coords for k in ddb.qpoints]) 1729 ddb_ngqpt = ddb.guessed_ngqpt 1730 1731 # If qpath is set, we read the list of q-points to be used to interpolate the DVDB file. 1732 # The DVDB and DDB file have to correspond to a regular grid. 1733 dvdb = dvdb_file 1734 if qpath is None: 1735 qpath = q_frac_coords 1736 else: 1737 interp_inp = inp.new_with_vars(optdriver=7, eph_task=-5, ddb_ngqpt=ddb_ngqpt, 1738 ph_nqpath=len(qpath), ph_qpath=qpath, prtphdos=0) 1739 dvdb = new.register_eph_task(interp_inp, deps={wfk_task: "WFK", ddb_file: "DDB", dvdb_file: "DVDB"}, 1740 manager=tm) 1741 1742 # Create a WFK expansion task 1743 if expand: 1744 fbz_nscf_inp = inp.new_with_vars(optdriver=8) 1745 fbz_nscf_inp.set_spell_check(False) 1746 fbz_nscf_inp.set_vars(wfk_task="wfk_fullbz") 1747 tm_serial = manager.new_with_fixed_mpi_omp(1,1) 1748 wfk_task = new.register_nscf_task(fbz_nscf_inp, deps={wfk_task: "WFK", den_file: "DEN"}, 1749 manager=tm_serial) 1750 new.wfkq_tasks.append(wfk_task) 1751 new.wfk_task = wfk_task 1752 1753 if with_ddk: 1754 kptopt = 3 if expand else 1 1755 ddk_inp = inp.new_with_vars(optdriver=8,kptopt=kptopt) 1756 ddk_inp.set_spell_check(False) 1757 ddk_inp.set_vars(wfk_task="wfk_ddk") 1758 ddk_task = new.register_nscf_task(ddk_inp, deps={wfk_task: "WFK", den_file: "DEN"}, manager=tm) 1759 new.wfkq_tasks.append(ddk_task) 1760 1761 # For each qpoint 1762 for qpt in qpath: 1763 is_gamma = np.sum(qpt ** 2) < 1e-12 1764 if is_gamma: 1765 # Create a link from WFK to WFQ on_ok 1766 wfkq_task = wfk_task 1767 deps = {wfk_task: ["WFK", "WFQ"], ddb_file: "DDB", dvdb: "DVDB"} 1768 else: 1769 # Create a WFQ task 1770 nscf_inp = nscf_inp.new_with_vars(kptopt=3, qpt=qpt, nqpt=1) 1771 wfkq_task = new.register_nscf_task(nscf_inp, deps={den_file: "DEN"}, manager=tm) 1772 new.wfkq_tasks.append(wfkq_task) 1773 deps = {wfk_task: "WFK", wfkq_task: "WFQ", ddb_file: "DDB", dvdb: "DVDB"} 1774 1775 # Create a EPH task 1776 eph_inp = inp.new_with_vars(optdriver=7, prtphdos=0, eph_task=-2, kptopt=3, 1777 ddb_ngqpt=[1, 1, 1], nqpt=1, qpt=qpt) 1778 t = new.register_eph_task(eph_inp, deps=deps, manager=tm) 1779 new.wfkq_task_children[wfkq_task].append(t) 1780 1781 return new 1782 1783 @classmethod 1784 def from_phononwfkq_work(cls, phononwfkq_work, nscf_vars={}, remove_wfkq=True, with_ddk=True, manager=None): 1785 """ 1786 Construct a `GKKPWork` from a `PhononWfkqWork` object. 1787 The WFQ are the ones used for PhononWfkqWork so in principle have only valence bands 1788 """ 1789 # Get list of qpoints from the the phonon tasks in this work 1790 qpoints = [] 1791 qpoints_deps = [] 1792 for task in phononwfkq_work: 1793 if isinstance(task,PhononTask): 1794 # Store qpoints 1795 qpt = task.input.get("qpt", [0, 0, 0]) 1796 qpoints.append(qpt) 1797 # Store dependencies 1798 qpoints_deps.append(task.deps) 1799 1800 # Create file nodes 1801 ddb_path = phononwfkq_work.outdir.has_abiext("DDB") 1802 dvdb_path = phononwfkq_work.outdir.has_abiext("DVDB") 1803 ddb_file = FileNode(ddb_path) 1804 dvdb_file = FileNode(dvdb_path) 1805 1806 # Get scf_task from first q-point 1807 for dep in qpoints_deps[0]: 1808 if isinstance(dep.node,ScfTask) and dep.exts[0] == 'WFK': 1809 scf_task = dep.node 1810 1811 # Create new work 1812 new = cls(manager=manager) 1813 new.remove_wfkq = remove_wfkq 1814 new.wfkq_tasks = [] 1815 new.wfk_task = [] 1816 1817 # Add one eph task per qpoint 1818 for qpt,qpoint_deps in zip(qpoints,qpoints_deps): 1819 # Create eph task 1820 eph_input = scf_task.input.new_with_vars(optdriver=7, prtphdos=0, eph_task=-2, 1821 ddb_ngqpt=[1, 1, 1], nqpt=1, qpt=qpt) 1822 deps = {ddb_file: "DDB", dvdb_file: "DVDB"} 1823 for dep in qpoint_deps: 1824 deps[dep.node] = dep.exts[0] 1825 # If no WFQ in deps link the WFK with WFQ extension 1826 if 'WFQ' not in deps.values(): 1827 inv_deps = dict((v, k) for k, v in deps.items()) 1828 wfk_task = inv_deps['WFK'] 1829 wfk_path = wfk_task.outdir.has_abiext("WFK") 1830 # Check if netcdf 1831 filename, extension = os.path.splitext(wfk_path) 1832 infile = 'out_WFQ' + extension 1833 wfq_path = os.path.join(os.path.dirname(wfk_path), infile) 1834 if not os.path.isfile(wfq_path): os.symlink(wfk_path, wfq_path) 1835 deps[FileNode(wfq_path)] = 'WFQ' 1836 new.register_eph_task(eph_input, deps=deps) 1837 1838 return new 1839 1840 def on_ok(self, sender): 1841 """ 1842 This callback is called when one task reaches status `S_OK`. 1843 It removes the WFKQ file if all its children have reached `S_OK`. 1844 """ 1845 if self.remove_wfkq: 1846 for task in self.wfkq_tasks: 1847 if task.status != task.S_OK: continue 1848 children = self.wfkq_task_children[task] 1849 if all(child.status == child.S_OK for child in children): 1850 path = task.outdir.has_abiext("WFQ") 1851 if path: 1852 self.history.info("Removing WFQ: %s" % path) 1853 os.remove(path) 1854 1855 # If wfk task we create a link to a wfq file so abinit is happy 1856 if sender == self.wfk_task: 1857 wfk_path = self.wfk_task.outdir.has_abiext("WFK") 1858 # Check if netcdf 1859 filename, extension = os.path.splitext(wfk_path) 1860 infile = 'out_WFQ' + extension 1861 infile = os.path.join(os.path.dirname(wfk_path), infile) 1862 os.symlink(wfk_path, infile) 1863 1864 return super().on_ok(sender) 1865 1866 1867class BecWork(Work, MergeDdb): 1868 """ 1869 Work for the computation of the Born effective charges. 1870 1871 This work consists of DDK tasks and phonon + electric field perturbation 1872 It provides the callback method (on_all_ok) that calls mrgddb to merge the 1873 partial DDB files produced by the work. 1874 1875 .. rubric:: Inheritance Diagram 1876 .. inheritance-diagram:: BecWork 1877 """ 1878 1879 @classmethod 1880 def from_scf_task(cls, scf_task, ddk_tolerance=None, ph_tolerance=None, manager=None): 1881 """ 1882 Build tasks for the computation of Born effective charges from a ground-state task. 1883 1884 Args: 1885 scf_task: |ScfTask| object. 1886 ddk_tolerance: tolerance used in the DDK run if with_becs. None to use AbiPy default. 1887 ph_tolerance: dict {"varname": value} with the tolerance used in the phonon run. 1888 None to use AbiPy default. 1889 manager: |TaskManager| object. 1890 """ 1891 new = cls(manager=manager) 1892 new.add_becs_from_scf_task(scf_task, ddk_tolerance, ph_tolerance) 1893 return new 1894 1895 def on_all_ok(self): 1896 """ 1897 This method is called when all tasks reach S_OK 1898 Ir runs `mrgddb` in sequential on the local machine to produce 1899 the final DDB file in the outdir of the |Work|. 1900 """ 1901 # Merge DDB files. 1902 out_ddb = self.merge_ddb_files() 1903 return self.Results(node=self, returncode=0, message="DDB merge done") 1904 1905 1906class DteWork(Work, MergeDdb): 1907 """ 1908 Work for the computation of the third derivative of the energy. 1909 1910 This work consists of DDK tasks and electric field perturbation. 1911 It provides the callback method (on_all_ok) that calls mrgddb to merge the partial DDB files produced 1912 1913 .. rubric:: Inheritance Diagram 1914 .. inheritance-diagram:: DteWork 1915 """ 1916 @classmethod 1917 def from_scf_task(cls, scf_task, ddk_tolerance=None, manager=None): 1918 """ 1919 Build a DteWork from a ground-state task. 1920 1921 Args: 1922 scf_task: |ScfTask| object. 1923 ddk_tolerance: tolerance used in the DDK run if with_becs. None to use AbiPy default. 1924 manager: |TaskManager| object. 1925 """ 1926 if not isinstance(scf_task, ScfTask): 1927 raise TypeError("task `%s` does not inherit from ScfTask" % scf_task) 1928 1929 new = cls(manager=manager) 1930 1931 # DDK calculations 1932 multi_ddk = scf_task.input.make_ddk_inputs(tolerance=ddk_tolerance) 1933 1934 ddk_tasks = [] 1935 for ddk_inp in multi_ddk: 1936 ddk_task = new.register_ddk_task(ddk_inp, deps={scf_task: "WFK"}) 1937 ddk_tasks.append(ddk_task) 1938 1939 # Build the list of inputs for electric field perturbation 1940 # Each task is connected to all the previous DDK, DDE task and to the scf_task. 1941 multi_dde = scf_task.input.make_dde_inputs(use_symmetries=False) 1942 1943 # To compute the nonlinear coefficients all the directions of the perturbation 1944 # have to be taken in consideration 1945 # DDE calculations 1946 dde_tasks = [] 1947 dde_deps = {ddk_task: "DDK" for ddk_task in ddk_tasks} 1948 dde_deps.update({scf_task: "WFK"}) 1949 for dde_inp in multi_dde: 1950 dde_task = new.register_dde_task(dde_inp, deps=dde_deps) 1951 dde_tasks.append(dde_task) 1952 1953 # DTE calculations 1954 dte_deps = {scf_task: "WFK DEN"} 1955 dte_deps.update({dde_task: "1WF 1DEN" for dde_task in dde_tasks}) 1956 1957 multi_dte = scf_task.input.make_dte_inputs() 1958 dte_tasks = [] 1959 for dte_inp in multi_dte: 1960 dte_task = new.register_dte_task(dte_inp, deps=dte_deps) 1961 dte_tasks.append(dte_task) 1962 1963 return new 1964 1965 def on_all_ok(self): 1966 """ 1967 This method is called when all tasks reach S_OK 1968 It runs `mrgddb` in sequential on the local machine to produce 1969 the final DDB file in the outdir of the `Work`. 1970 """ 1971 # Merge DDB files. 1972 out_ddb = self.merge_ddb_files() 1973 return self.Results(node=self, returncode=0, message="DDB merge done") 1974 1975 1976class ConducWork(Work): 1977 """ 1978 Workflow for the computation of electrical conductivity. 1979 1980 Can be called from : 1981 1. MultiDataset and PhononWork 1982 2. MultiDataset, DDB filepath and DVDB filepath. 1983 Can use Kerange Capability using withKerange=True 1984 1985 This work consists of 3 tasks or 5 tasks with kerange : 1986 1. SCF GS 1987 2. NSCF 1988 3. Kerange (Kerange only) 1989 4. WFK Interpolation (Kerange only) 1990 5. Electrical Conductivity Calculation. 1991 """ 1992 1993 @classmethod 1994 def from_phwork(cls, phwork, multi, nbr_proc=None, flow=None, with_kerange=False, 1995 omp_nbr_thread=1, manager=None): 1996 """ 1997 Construct a |ConducWork| from a |PhononWork| and |MultiDataset|. 1998 1999 Args: 2000 phwork: a |PhononWork| object calculating the DDB and DVDB files. 2001 multi: a |MultiDataset| object containing a list of 3 datasets or 5 with Kerange. 2002 See abipy/abio/factories.py -> conduc_from_scf_nscf_inputs for details about multi. 2003 nbr_proc: Required if with_kerange since autoparal doesn't work with optdriver=8. 2004 flow: The flow calling the work. Used for with_fixed_mpi_omp. 2005 with_kerange: True if using Kerange. 2006 omp_nbr_thread : Number of omp_thread to use. 2007 manager: |TaskManager| of the task. If None, the manager is initialized from the config file. 2008 """ 2009 # Verify phwork 2010 if not isinstance(phwork, PhononWork): 2011 raise TypeError("Work `%s` does not inherit from PhononWork" % phwork) 2012 # Verify Multi 2013 if (not with_kerange) and (multi.ndtset != 3): #Without kerange, multi should contain 3 datasets 2014 raise ValueError("""The |MultiDataset| object does not contain the expected number of dataset. 2015 It should have 3 datasets and it had `%s`. You should generate 2016 multi with the factory function conduc_from_scf_nscf_inputs""" % multi.ndtset) 2017 if (with_kerange) and (multi.ndtset != 5): # With Kerange, multi should contain 5 datasets 2018 raise ValueError("""The |MultiDataset| object does not contain the expected number of dataset. 2019 It should have 5 datasets and it had `%s`. You should generate 2020 multi with the factory function conduc_from_scf_nscf_inputs""" % multi.ndtset) 2021 # Verify nbr_proc and flow are defined if with_kerange 2022 if with_kerange and (flow is None or nbr_proc is None): 2023 raise ValueError("""When using kerange, the argument flow and nbr_proc must be passed to the function from_filepath 2024 flow = {}\n nbr_proc = {}""".format(flow, nbr_proc)) 2025 2026 new = cls(manager=manager) 2027 2028 new.register_task(multi[0]) 2029 new.register_task(multi[1], deps={new[0]: "DEN"}) 2030 taskNumber = 2 # To keep track of the task in new and multi 2031 2032 if(with_kerange): # Using Kerange 2033 new.register_task(multi[2], deps={new[1]: "WFK"}) 2034 new.register_task(multi[3], deps={new[0]: "DEN", new[1]: "WFK", new[2]: "KERANGE.nc"}) 2035 taskNumber = 4 # We have 2 more dataset 2036 2037 new.register_task(multi[taskNumber], deps={new[taskNumber-1]: "WFK", phwork: ["DDB","DVDB"]}) 2038 2039 for task in new: 2040 task.set_work(new) 2041 2042 # Manual Paralellization since autoparal doesn't work with optdriver=8 (t2) 2043 if flow is not None : 2044 new.set_flow(flow) 2045 if nbr_proc is not None: 2046 for task in new[2:]: 2047 task.with_fixed_mpi_omp(nbr_proc, omp_nbr_thread) 2048 return new 2049 2050 @classmethod 2051 def from_filepath(cls, ddb_path, dvdb_path, multi, nbr_proc=None, flow=None, 2052 with_kerange=False, omp_nbr_thread=1, manager=None): 2053 """ 2054 Construct a |ConducWork| from previously calculated DDB/DVDB file and |MultiDataset|. 2055 2056 Args: 2057 multi: a |MultiDataset| object containing a list of 3 datasets or 5 with Kerange. 2058 See abipy/abio/factories.py -> conduc_from_scf_nscf_inputs for details about multi. 2059 ddb_path: a string containing the path to the DDB file. 2060 dvdb_path: a string containing the path to the DVDB file. 2061 nbr_proc: Required if with_kerange since autoparal doesn't work with optdriver=8. 2062 flow: The flow calling the work, only needed if with_kerange. 2063 with_kerange: True if using Kerange. 2064 omp_nbr_thread : Number of omp_thread to use. 2065 manager: |TaskManager| of the task. If None, the manager is initialized from the config file. 2066 """ 2067 # Verify Multi 2068 if (not with_kerange) and (multi.ndtset != 3): #Without kerange, multi should contain 4 datasets 2069 raise ValueError("""The |MultiDataset| object does not contain the expected number of dataset. 2070 It should have 4 datasets and it had `%s`. You should generate 2071 multi with the factory function conduc_from_scf_nscf_inputs""" % multi.ndtset) 2072 if (with_kerange) and (multi.ndtset != 5): # With Kerange, multi should contain 6 datasets 2073 raise ValueError("""The |MultiDataset| object does not contain the expected number of dataset. 2074 It should have 6 datasets and it had `%s`.You should generate 2075 multi with the factory function conduc_from_scf_nscf_inputs""" % multi.ndtset) 2076 # Make sure both file exists 2077 if not os.path.exists(ddb_path): 2078 raise ValueError("The DDB file doesn't exists : `%s`" % ddb_path) 2079 if not os.path.exists(dvdb_path): 2080 raise ValueError("The DVDB file doesn't exists : `%s`" % dvdb_path) 2081 # Verify nbr_proc and flow are defined if with_kerange 2082 if with_kerange and (flow is None or nbr_proc is None): 2083 raise ValueError("""When using kerange, the argument flow and nbr_proc must be passed to the function from_filepath 2084 flow = {}, nbr_proc = {}""".format(flow, nbr_proc)) 2085 2086 new = cls(manager=manager) 2087 2088 new.register_task(multi[0]) 2089 new.register_task(multi[1], deps={new[0]: "DEN"}) 2090 taskNumber = 2 # To keep track of the task in new and multi 2091 2092 if(with_kerange): # Using Kerange 2093 new.register_task(multi[2], deps={new[1]: "WFK"}) 2094 new.register_task(multi[3], deps={new[0]: "DEN", new[1]: "WFK", new[2]: "KERANGE.nc"}) 2095 taskNumber = 4 # We have 2 more task 2096 2097 #new.register_task(multi[taskNumber], deps=[Dependency(new[taskNumber-1], "WFK"), 2098 # Dependency(ddb_path, "DDB"), 2099 # Dependency(dvdb_path, "DVDB")]) 2100 #new.register_task(multi[taskNumber], deps=[{new[taskNumber-1]: "WFK"},(ddb_path, "DDB"),(dvdb_path, "DVDB")]) 2101 new.register_task(multi[taskNumber], deps={new[taskNumber-1]: "WFK", ddb_path: "DDB", dvdb_path: "DVDB"}) 2102 2103 for task in new: 2104 task.set_work(new) 2105 2106 # Manual Paralellization since autoparal doesn't work with optdriver=8 (t2) 2107 if flow is not None : 2108 new.set_flow(flow) 2109 if nbr_proc is not None: 2110 for task in new[2:]: 2111 task.with_fixed_mpi_omp(nbr_proc, omp_nbr_thread) 2112 return new 2113