1# coding: utf-8 2import os 3import tempfile 4import shutil 5import abipy.data as abidata 6 7from monty.functools import lazy_property 8from pymatgen.core.lattice import Lattice 9from abipy.core.structure import Structure 10from abipy.flowtk.launcher import BatchLauncher 11from abipy.flowtk.flows import * 12from abipy.flowtk.works import * 13from abipy.flowtk.tasks import * 14from abipy.core.testing import AbipyTest 15from abipy import abilab 16from abipy import flowtk 17 18 19class FakeAbinitInput(object): 20 """Emulate an Abinit input.""" 21 @lazy_property 22 def pseudos(self): 23 return [abidata.pseudo("14si.pspnc")] 24 25 @lazy_property 26 def structure(self): 27 coords = [] 28 coords.append([0, 0, 0]) 29 coords.append([0.75, 0.5, 0.75]) 30 lattice = Lattice([[3.8401979337, 0.00, 0.00], 31 [1.9200989668, 3.3257101909, 0.00], 32 [0.00, -2.2171384943, 3.1355090603]]) 33 return Structure(lattice, ["Si", "Si"], coords) 34 35 def get(self, key, default=None): 36 """The real AbinitInput is a dict-like object.""" 37 if default is not None: return default 38 return key 39 40 41class FlowUnitTest(AbipyTest): 42 """Provides helper function for testing Abinit flows.""" 43 MANAGER = """\ 44policy: 45 autoparal: 1 46qadapters: 47 - &batch 48 priority: 1 49 queue: 50 qtype: slurm 51 qname: Oban 52 qparams: 53 mail_user: nobody@nowhere 54 limits: 55 timelimit: 0:20:00 56 min_cores: 4 57 max_cores: 12 58 #condition: {"$eq": {omp_threads: 2}} 59 hardware: 60 num_nodes: 10 61 sockets_per_node: 1 62 cores_per_socket: 2 63 mem_per_node: 4 Gb 64 job: 65 modules: 66 - intel/compilerpro/13.0.1.117 67 - fftw3/intel/3.3 68 shell_env: 69 PATH: /home/user/tmp_intel13/src/98_main/:/home/user//NAPS/intel13/bin:$PATH 70 LD_LIBRARY_PATH: /home/user/NAPS/intel13/lib:$LD_LIBRARY_PATH 71 mpi_runner: mpirun 72 73# Connection to the MongoDb database (optional) 74db_connector: 75 database: abinit 76 collection: test 77 #host: 0.0.0.0 78 #port: 8080 79 #user: gmatteo 80 #password: helloworld 81 82batch_adapter: *batch 83""" 84 def setUp(self): 85 """Initialization phase.""" 86 super().setUp() 87 88 # Temporary directory for the flow. 89 self.workdir = tempfile.mkdtemp() 90 91 # Create the TaskManager. 92 self.manager = TaskManager.from_string(self.MANAGER) 93 94 # Fake input file 95 self.fake_input = FakeAbinitInput() 96 97 def tearDown(self): 98 """Delete workdir""" 99 shutil.rmtree(self.workdir) 100 101 102class FlowTest(FlowUnitTest): 103 104 def test_base(self): 105 """Testing Flow...""" 106 aequal = self.assertEqual 107 flow = Flow(workdir=self.workdir, manager=self.manager) 108 assert flow.isinstance(Flow) 109 assert not flow.isinstance(None) 110 assert not flow.has_scheduler 111 112 # Build a work with a task 113 work = flow.register_task(self.fake_input) 114 assert work.is_work 115 assert len(work.color_hex) == 7 116 assert work.color_hex.startswith("#") 117 task0_w0 = work[0] 118 assert task0_w0.is_task 119 str(task0_w0.status.colored) 120 assert len(flow) == 1 121 assert flow.num_tasks == 1 122 assert flow.has_db 123 124 #print(task0_w0.input_structure) 125 str(task0_w0.make_input) 126 127 # Task history 128 assert len(task0_w0.history) == 0 129 task0_w0.history.info("Hello %s", "world") 130 assert len(task0_w0.history) == 1 131 str(task0_w0.history) 132 record = task0_w0.history.pop() 133 print(record, repr(record)) 134 assert record.get_message(asctime=False) == "Hello world" 135 assert len(task0_w0.history) == 0 136 assert flow.select_tasks(nids=task0_w0.node_id)[0] == task0_w0 137 assert flow.select_tasks(wslice=slice(0,1,1)) == [task0_w0] 138 assert flow.select_tasks(task_class="DfptTask") == [] 139 assert flow.get_task_scfcycles() == [] 140 141 # Build a workflow containing two tasks depending on task0_w0 142 work = Work() 143 assert work.is_work 144 work.register(self.fake_input) 145 work.register(self.fake_input) 146 assert len(work) == 2 147 148 flow.register_work(work, deps={task0_w0: "WFK"}) 149 assert flow.is_flow 150 assert len(flow) == 2 151 152 # Add another work without dependencies. 153 task0_w2 = flow.register_task(self.fake_input)[0] 154 assert len(flow) == 3 155 assert not flow.is_work 156 157 # Allocate internal tables 158 flow.allocate() 159 160 # Check dependecies. 161 #task0_w1 = flow[1][0] 162 assert flow[1].depends_on(task0_w0) 163 assert flow[1][0].depends_on(task0_w0) 164 assert flow[1][0] in task0_w0.get_children() 165 assert task0_w0 in flow[1][0].get_parents() 166 assert flow[1][0].find_parent_with_ext("WFK") == task0_w0 167 assert flow[1][0].find_parent_with_ext("FOOBAR") is None 168 assert not flow[2][0].depends_on(task0_w0) 169 assert not flow[2][0] in task0_w0.get_children() 170 assert not task0_w0 in flow[2][0].get_parents() 171 assert flow[1].pos == 1 172 assert flow[1][0].pos == (1, 0) 173 assert flow[2][0].pos == (2, 0) 174 175 assert not flow.all_ok 176 assert flow.num_tasks == 4 177 assert flow.ncores_used == 0 178 179 # API for iterations 180 aequal(len(list(flow.iflat_tasks(status="Initialized"))), sum(len(work) for work in flow)) 181 aequal(list(flow.iflat_tasks(nids=task0_w0.node_id)), [task0_w0]) 182 183 aequal([task0_w0], flow.tasks_from_nids(task0_w0.node_id)) 184 aequal([(0, 0)], flow.wti_from_nids(task0_w0.node_id)) 185 aequal([task0_w2], flow.tasks_from_nids([task0_w2.node_id])) 186 aequal([(2, 0)], flow.wti_from_nids([task0_w2.node_id])) 187 188 # Check for deadlocks 189 flow.check_dependencies() 190 191 # Save the flow in pickle format. 192 flow.build_and_pickle_dump() 193 194 # Find the pickle file in workdir and recreate the flow. 195 same_flow = Flow.pickle_load(self.workdir) 196 assert same_flow == flow 197 198 # to/from string 199 # FIXME This does not work with py3k 200 #s = flow.pickle_dumps(protocol=0) 201 #same_flow = Flow.pickle_loads(s) 202 #aequal(same_flow, flow) 203 204 self.assertMSONable(flow) 205 206 flow.show_info() 207 flow.show_summary() 208 flow.show_inputs() 209 flow.show_inputs(varnames="znucl") 210 211 df_vars = flow.get_vars_dataframe("ecut", "acell") 212 assert "ecut" in df_vars 213 214 # Test show_status 215 flow.show_status() 216 flow.show_tricky_tasks() 217 flow.show_event_handlers() 218 219 df = flow.compare_abivars(varnames=["ecut", "natom"], printout=True, with_colors=True) 220 assert "ecut" in df 221 222 dfs = flow.compare_structures(with_spglib=False, verbose=2, printout=True, with_colors=True) 223 assert "alpha" in dfs.lattice 224 225 dfs, ebands_plotter = flow.compare_ebands(verbose=0) 226 assert not dfs and not ebands_plotter 227 228 dfs, hist_plotter = flow.compare_hist(with_spglib=False, verbose=2, printout=True, with_colors=True) 229 assert not dfs and not hist_plotter 230 231 if self.has_networkx(): 232 assert flow.plot_networkx(mode="network", with_edge_labels=False, arrows=False, 233 node_size="num_cores", node_label="name_class", layout_type="spring", show=False) 234 assert flow.plot_networkx(mode="status", with_edge_labels=True, arrows=True, 235 node_size="num_cores", node_label="name_class", layout_type="spring", show=False) 236 237 if self.has_python_graphviz(): 238 assert flow.get_graphviz(engine="automatic", graph_attr=None, node_attr=None, edge_attr=None) 239 assert flow.graphviz_imshow(ax=None, figsize=None, dpi=300, fmt="png", show=False) 240 241 if self.has_panel(): 242 assert hasattr(flow.get_panel(), "show") 243 244 def test_workdir(self): 245 """Testing if one can use workdir=None in flow.__init__ and then flow.allocate(workdir).""" 246 flow = Flow(workdir=None, manager=self.manager) 247 flow.register_task(self.fake_input) 248 #flow.register_work(work) 249 work = Work() 250 work.register_scf_task(self.fake_input) 251 flow.register_work(work) 252 253 # If flow.workdir is None, we should used flow.allocate(workdir) 254 with self.assertRaises(RuntimeError): flow.allocate() 255 256 tmpdir = tempfile.mkdtemp() 257 flow.allocate(workdir=tmpdir) 258 259 str(flow) 260 assert len(flow) == 2 261 flow.build() 262 263 for i, work in enumerate(flow): 264 assert work.workdir == os.path.join(tmpdir, "w%d" % i) 265 for t, task in enumerate(work): 266 assert task.workdir == os.path.join(work.workdir, "t%d" % t) 267 268 def test_nscf_flow_with_append(self): 269 """Test creation of NSCF tasks from flow with append = True""" 270 271 def make_scf_nscf_inputs(): 272 """Build ands return the input files for the GS-SCF and the GS-NSCF tasks.""" 273 274 multi = abilab.MultiDataset(structure=abidata.cif_file("si.cif"), 275 pseudos=abidata.pseudos("14si.pspnc"), ndtset=2) 276 277 # Set global variables (dataset1 and dataset2) 278 multi.set_vars(ecut=6, nband=8) 279 280 # Dataset 1 (GS-SCF run) 281 multi[0].set_kmesh(ngkpt=[8, 8, 8], shiftk=[0, 0, 0]) 282 multi[0].set_vars(tolvrs=1e-6) 283 284 # Dataset 2 (GS-NSCF run on a k-path) 285 kptbounds = [ 286 [0.5, 0.0, 0.0], # L point 287 [0.0, 0.0, 0.0], # Gamma point 288 [0.0, 0.5, 0.5], # X point 289 ] 290 291 multi[1].set_kpath(ndivsm=6, kptbounds=kptbounds) 292 multi[1].set_vars(tolwfr=1e-12) 293 294 # Return two input files for the GS and the NSCF run 295 scf_input, nscf_input = multi.split_datasets() 296 return scf_input, nscf_input 297 298 scf_input, nscf_input = make_scf_nscf_inputs() 299 hello_flow = flowtk.Flow(workdir=self.mkdtemp()) 300 hello_flow.register_scf_task(scf_input, append=True) 301 hello_flow.register_nscf_task(nscf_input, deps={hello_flow[0][0]: "DEN"}, append=True) 302 303 #flow[0].get_graphviz_dirtree() 304 #abilab.print_doc(flowtk.PhononWork) 305 306 hello_flow = flowtk.Flow(workdir=self.mkdtemp()) 307 hello_flow.register_scf_task(scf_input, append=True) 308 hello_flow.register_nscf_task(nscf_input, deps={hello_flow[0][0]: "DEN"}, append=False) 309 310 311 312class TestFlowInSpectatorMode(FlowUnitTest): 313 314 def test_spectator(self): 315 flow = Flow(workdir=self.workdir, manager=self.manager) 316 317 work0 = Work() 318 gs_task = work0.register_scf_task(self.fake_input) 319 assert gs_task.isinstance(ScfTask) 320 assert gs_task.isinstance("ScfTask") 321 task = work0.register_scf_task(self.fake_input) 322 assert task.is_abinit_task 323 assert not task.is_optic_task 324 assert not task.is_anaddb_task 325 326 work1 = Work() 327 work1.register_scf_task(self.fake_input) 328 329 flow.register_work(work0) 330 flow.register_work(work1) 331 332 flow.disconnect_signals() 333 flow.disconnect_signals() 334 335 flow.connect_signals() 336 flow.connect_signals() 337 338 for mode in [False, True]: 339 flow.set_spectator_mode(mode=mode) 340 assert flow.in_spectator_mode == mode 341 for node in flow.iflat_nodes(): 342 assert node.in_spectator_mode == mode 343 344 assert len(list(flow.iflat_nodes())) == 1 + len(flow.works) + sum(len(work) for work in flow) 345 assert flow.node_from_nid(flow.node_id) == flow 346 347 flow.set_spectator_mode(mode=False) 348 flow.build_and_pickle_dump() 349 350 # picke load always returns a flow in spectator mode. 351 flow = Flow.pickle_load(flow.workdir) 352 assert flow.in_spectator_mode 353 354 #with self.assertRaises(flow.SpectatorError): flow.pickle_dump() 355 #with self.assertRaises(flow.SpectatorError): flow.make_scheduler().start() 356 357 work = flow[0] 358 assert work.send_signal(work.S_OK) is None 359 #with self.assertRaises(work.SpectatorError): work.on_ok() 360 #with self.assertRaises(work.SpectatorError): work.on_all_ok() 361 362 task = work[0] 363 assert task.send_signal(task.S_OK) is None 364 #with self.assertRaises(task.SpectatorError): task._on_done() 365 #with self.assertRaises(task.SpectatorError): task.on_ok() 366 #with self.assertRaises(task.SpectatorError): task._on_ok() 367 368 369class TestBatchLauncher(FlowUnitTest): 370 371 def test_batchlauncher(self): 372 """Testing BatchLauncher methods.""" 373 # Create the TaskManager. 374 manager = TaskManager.from_string(self.MANAGER) 375 print("batch_adapter", manager.batch_adapter) 376 assert manager.batch_adapter is not None 377 378 def build_flow_with_name(name): 379 """Build a flow with workdir None and the given name.""" 380 flow = Flow(workdir=None, manager=self.manager) 381 flow.set_name(name) 382 383 flow.register_task(self.fake_input) 384 work = Work() 385 work.register_scf_task(self.fake_input) 386 flow.register_work(work) 387 388 return flow 389 390 tmpdir = tempfile.mkdtemp() 391 batch = BatchLauncher(workdir=tmpdir, manager=manager) 392 str(batch) 393 394 flow0 = build_flow_with_name("flow0") 395 flow1 = build_flow_with_name("flow1") 396 flow2_same_name = build_flow_with_name("flow1") 397 398 batch.add_flow(flow0) 399 400 # Cannot add the same flow twice. 401 with self.assertRaises(batch.Error): 402 batch.add_flow(flow0) 403 404 batch.add_flow(flow1) 405 406 # Cannot add two flows with the same name. 407 with self.assertRaises(batch.Error): 408 batch.add_flow(flow2_same_name) 409 410 batch.submit(dry_run=True) 411 412 for i, flow in enumerate([flow0, flow1]): 413 assert flow.workdir == os.path.join(batch.workdir, "flow%d" % i) 414 415 batch.pickle_dump() 416 batch_from_pickle = BatchLauncher.pickle_load(batch.workdir) 417 assert all(f1 == f2 for f1, f2 in zip(batch.flows, batch_from_pickle.flows)) 418