1# coding: utf-8
2import os
3import tempfile
4import shutil
5import abipy.data as abidata
6
7from monty.functools import lazy_property
8from pymatgen.core.lattice import Lattice
9from abipy.core.structure import Structure
10from abipy.flowtk.launcher import BatchLauncher
11from abipy.flowtk.flows import *
12from abipy.flowtk.works import *
13from abipy.flowtk.tasks import *
14from abipy.core.testing import AbipyTest
15from abipy import abilab
16from abipy import flowtk
17
18
19class FakeAbinitInput(object):
20    """Emulate an Abinit input."""
21    @lazy_property
22    def pseudos(self):
23        return [abidata.pseudo("14si.pspnc")]
24
25    @lazy_property
26    def structure(self):
27        coords = []
28        coords.append([0, 0, 0])
29        coords.append([0.75, 0.5, 0.75])
30        lattice = Lattice([[3.8401979337, 0.00, 0.00],
31                          [1.9200989668, 3.3257101909, 0.00],
32                          [0.00, -2.2171384943, 3.1355090603]])
33        return Structure(lattice, ["Si", "Si"], coords)
34
35    def get(self, key, default=None):
36        """The real AbinitInput is a dict-like object."""
37        if default is not None: return default
38        return key
39
40
41class FlowUnitTest(AbipyTest):
42    """Provides helper function for testing Abinit flows."""
43    MANAGER = """\
44policy:
45    autoparal: 1
46qadapters:
47    - &batch
48      priority: 1
49      queue:
50        qtype: slurm
51        qname: Oban
52        qparams:
53            mail_user: nobody@nowhere
54      limits:
55        timelimit: 0:20:00
56        min_cores: 4
57        max_cores: 12
58        #condition: {"$eq": {omp_threads: 2}}
59      hardware:
60        num_nodes: 10
61        sockets_per_node: 1
62        cores_per_socket: 2
63        mem_per_node: 4 Gb
64      job:
65        modules:
66            - intel/compilerpro/13.0.1.117
67            - fftw3/intel/3.3
68        shell_env:
69            PATH: /home/user/tmp_intel13/src/98_main/:/home/user//NAPS/intel13/bin:$PATH
70            LD_LIBRARY_PATH: /home/user/NAPS/intel13/lib:$LD_LIBRARY_PATH
71        mpi_runner: mpirun
72
73# Connection to the MongoDb database (optional)
74db_connector:
75    database: abinit
76    collection: test
77    #host: 0.0.0.0
78    #port: 8080
79    #user: gmatteo
80    #password: helloworld
81
82batch_adapter: *batch
83"""
84    def setUp(self):
85        """Initialization phase."""
86        super().setUp()
87
88        # Temporary directory for the flow.
89        self.workdir = tempfile.mkdtemp()
90
91        # Create the TaskManager.
92        self.manager = TaskManager.from_string(self.MANAGER)
93
94        # Fake input file
95        self.fake_input = FakeAbinitInput()
96
97    def tearDown(self):
98        """Delete workdir"""
99        shutil.rmtree(self.workdir)
100
101
102class FlowTest(FlowUnitTest):
103
104    def test_base(self):
105        """Testing Flow..."""
106        aequal = self.assertEqual
107        flow = Flow(workdir=self.workdir, manager=self.manager)
108        assert flow.isinstance(Flow)
109        assert not flow.isinstance(None)
110        assert not flow.has_scheduler
111
112        # Build a work with a task
113        work = flow.register_task(self.fake_input)
114        assert work.is_work
115        assert len(work.color_hex) == 7
116        assert work.color_hex.startswith("#")
117        task0_w0 = work[0]
118        assert task0_w0.is_task
119        str(task0_w0.status.colored)
120        assert len(flow) == 1
121        assert flow.num_tasks == 1
122        assert flow.has_db
123
124        #print(task0_w0.input_structure)
125        str(task0_w0.make_input)
126
127        # Task history
128        assert len(task0_w0.history) == 0
129        task0_w0.history.info("Hello %s", "world")
130        assert len(task0_w0.history) == 1
131        str(task0_w0.history)
132        record = task0_w0.history.pop()
133        print(record, repr(record))
134        assert record.get_message(asctime=False) == "Hello world"
135        assert len(task0_w0.history) == 0
136        assert flow.select_tasks(nids=task0_w0.node_id)[0] == task0_w0
137        assert flow.select_tasks(wslice=slice(0,1,1)) == [task0_w0]
138        assert flow.select_tasks(task_class="DfptTask") == []
139        assert flow.get_task_scfcycles() == []
140
141        # Build a workflow containing two tasks depending on task0_w0
142        work = Work()
143        assert work.is_work
144        work.register(self.fake_input)
145        work.register(self.fake_input)
146        assert len(work) == 2
147
148        flow.register_work(work, deps={task0_w0: "WFK"})
149        assert flow.is_flow
150        assert len(flow) == 2
151
152        # Add another work without dependencies.
153        task0_w2 = flow.register_task(self.fake_input)[0]
154        assert len(flow) == 3
155        assert not flow.is_work
156
157        # Allocate internal tables
158        flow.allocate()
159
160        # Check dependecies.
161        #task0_w1 = flow[1][0]
162        assert flow[1].depends_on(task0_w0)
163        assert flow[1][0].depends_on(task0_w0)
164        assert flow[1][0] in task0_w0.get_children()
165        assert task0_w0 in flow[1][0].get_parents()
166        assert flow[1][0].find_parent_with_ext("WFK") == task0_w0
167        assert flow[1][0].find_parent_with_ext("FOOBAR") is None
168        assert not flow[2][0].depends_on(task0_w0)
169        assert not flow[2][0] in task0_w0.get_children()
170        assert not task0_w0 in flow[2][0].get_parents()
171        assert flow[1].pos == 1
172        assert flow[1][0].pos == (1, 0)
173        assert flow[2][0].pos == (2, 0)
174
175        assert not flow.all_ok
176        assert flow.num_tasks == 4
177        assert flow.ncores_used == 0
178
179        # API for iterations
180        aequal(len(list(flow.iflat_tasks(status="Initialized"))), sum(len(work) for work in flow))
181        aequal(list(flow.iflat_tasks(nids=task0_w0.node_id)), [task0_w0])
182
183        aequal([task0_w0], flow.tasks_from_nids(task0_w0.node_id))
184        aequal([(0, 0)], flow.wti_from_nids(task0_w0.node_id))
185        aequal([task0_w2], flow.tasks_from_nids([task0_w2.node_id]))
186        aequal([(2, 0)], flow.wti_from_nids([task0_w2.node_id]))
187
188        # Check for deadlocks
189        flow.check_dependencies()
190
191        # Save the flow in pickle format.
192        flow.build_and_pickle_dump()
193
194        # Find the pickle file in workdir and recreate the flow.
195        same_flow = Flow.pickle_load(self.workdir)
196        assert same_flow == flow
197
198        # to/from string
199        # FIXME This does not work with py3k
200        #s = flow.pickle_dumps(protocol=0)
201        #same_flow = Flow.pickle_loads(s)
202        #aequal(same_flow, flow)
203
204        self.assertMSONable(flow)
205
206        flow.show_info()
207        flow.show_summary()
208        flow.show_inputs()
209        flow.show_inputs(varnames="znucl")
210
211        df_vars = flow.get_vars_dataframe("ecut", "acell")
212        assert "ecut" in df_vars
213
214        # Test show_status
215        flow.show_status()
216        flow.show_tricky_tasks()
217        flow.show_event_handlers()
218
219        df = flow.compare_abivars(varnames=["ecut", "natom"], printout=True, with_colors=True)
220        assert "ecut" in df
221
222        dfs = flow.compare_structures(with_spglib=False, verbose=2, printout=True, with_colors=True)
223        assert "alpha" in dfs.lattice
224
225        dfs, ebands_plotter = flow.compare_ebands(verbose=0)
226        assert not dfs and not ebands_plotter
227
228        dfs, hist_plotter = flow.compare_hist(with_spglib=False, verbose=2, printout=True, with_colors=True)
229        assert not dfs and not hist_plotter
230
231        if self.has_networkx():
232            assert flow.plot_networkx(mode="network", with_edge_labels=False, arrows=False,
233                      node_size="num_cores", node_label="name_class", layout_type="spring", show=False)
234            assert flow.plot_networkx(mode="status", with_edge_labels=True, arrows=True,
235                      node_size="num_cores", node_label="name_class", layout_type="spring", show=False)
236
237        if self.has_python_graphviz():
238            assert flow.get_graphviz(engine="automatic", graph_attr=None, node_attr=None, edge_attr=None)
239            assert flow.graphviz_imshow(ax=None, figsize=None, dpi=300, fmt="png", show=False)
240
241        if self.has_panel():
242            assert hasattr(flow.get_panel(), "show")
243
244    def test_workdir(self):
245        """Testing if one can use workdir=None in flow.__init__ and then flow.allocate(workdir)."""
246        flow = Flow(workdir=None, manager=self.manager)
247        flow.register_task(self.fake_input)
248        #flow.register_work(work)
249        work = Work()
250        work.register_scf_task(self.fake_input)
251        flow.register_work(work)
252
253        # If flow.workdir is None, we should used flow.allocate(workdir)
254        with self.assertRaises(RuntimeError): flow.allocate()
255
256        tmpdir = tempfile.mkdtemp()
257        flow.allocate(workdir=tmpdir)
258
259        str(flow)
260        assert len(flow) == 2
261        flow.build()
262
263        for i, work in enumerate(flow):
264            assert work.workdir == os.path.join(tmpdir, "w%d" % i)
265            for t, task in enumerate(work):
266                assert task.workdir == os.path.join(work.workdir, "t%d" % t)
267
268    def test_nscf_flow_with_append(self):
269        """Test creation of NSCF tasks from flow with append = True"""
270
271        def make_scf_nscf_inputs():
272            """Build ands return the input files for the GS-SCF and the GS-NSCF tasks."""
273
274            multi = abilab.MultiDataset(structure=abidata.cif_file("si.cif"),
275                                        pseudos=abidata.pseudos("14si.pspnc"), ndtset=2)
276
277            # Set global variables (dataset1 and dataset2)
278            multi.set_vars(ecut=6, nband=8)
279
280            # Dataset 1 (GS-SCF run)
281            multi[0].set_kmesh(ngkpt=[8, 8, 8], shiftk=[0, 0, 0])
282            multi[0].set_vars(tolvrs=1e-6)
283
284            # Dataset 2 (GS-NSCF run on a k-path)
285            kptbounds = [
286                [0.5, 0.0, 0.0], # L point
287                [0.0, 0.0, 0.0], # Gamma point
288                [0.0, 0.5, 0.5], # X point
289            ]
290
291            multi[1].set_kpath(ndivsm=6, kptbounds=kptbounds)
292            multi[1].set_vars(tolwfr=1e-12)
293
294            # Return two input files for the GS and the NSCF run
295            scf_input, nscf_input = multi.split_datasets()
296            return scf_input, nscf_input
297
298        scf_input, nscf_input = make_scf_nscf_inputs()
299        hello_flow = flowtk.Flow(workdir=self.mkdtemp())
300        hello_flow.register_scf_task(scf_input, append=True)
301        hello_flow.register_nscf_task(nscf_input, deps={hello_flow[0][0]: "DEN"}, append=True)
302
303        #flow[0].get_graphviz_dirtree()
304        #abilab.print_doc(flowtk.PhononWork)
305
306        hello_flow = flowtk.Flow(workdir=self.mkdtemp())
307        hello_flow.register_scf_task(scf_input, append=True)
308        hello_flow.register_nscf_task(nscf_input, deps={hello_flow[0][0]: "DEN"}, append=False)
309
310
311
312class TestFlowInSpectatorMode(FlowUnitTest):
313
314    def test_spectator(self):
315        flow = Flow(workdir=self.workdir, manager=self.manager)
316
317        work0 = Work()
318        gs_task = work0.register_scf_task(self.fake_input)
319        assert gs_task.isinstance(ScfTask)
320        assert gs_task.isinstance("ScfTask")
321        task = work0.register_scf_task(self.fake_input)
322        assert task.is_abinit_task
323        assert not task.is_optic_task
324        assert not task.is_anaddb_task
325
326        work1 = Work()
327        work1.register_scf_task(self.fake_input)
328
329        flow.register_work(work0)
330        flow.register_work(work1)
331
332        flow.disconnect_signals()
333        flow.disconnect_signals()
334
335        flow.connect_signals()
336        flow.connect_signals()
337
338        for mode in [False, True]:
339            flow.set_spectator_mode(mode=mode)
340            assert flow.in_spectator_mode == mode
341            for node in flow.iflat_nodes():
342                assert node.in_spectator_mode == mode
343
344        assert len(list(flow.iflat_nodes())) == 1 + len(flow.works) + sum(len(work) for work in flow)
345        assert flow.node_from_nid(flow.node_id) == flow
346
347        flow.set_spectator_mode(mode=False)
348        flow.build_and_pickle_dump()
349
350        # picke load always returns a flow in spectator mode.
351        flow = Flow.pickle_load(flow.workdir)
352        assert flow.in_spectator_mode
353
354        #with self.assertRaises(flow.SpectatorError): flow.pickle_dump()
355        #with self.assertRaises(flow.SpectatorError): flow.make_scheduler().start()
356
357        work = flow[0]
358        assert work.send_signal(work.S_OK) is None
359        #with self.assertRaises(work.SpectatorError): work.on_ok()
360        #with self.assertRaises(work.SpectatorError): work.on_all_ok()
361
362        task = work[0]
363        assert task.send_signal(task.S_OK) is None
364        #with self.assertRaises(task.SpectatorError): task._on_done()
365        #with self.assertRaises(task.SpectatorError): task.on_ok()
366        #with self.assertRaises(task.SpectatorError): task._on_ok()
367
368
369class TestBatchLauncher(FlowUnitTest):
370
371    def test_batchlauncher(self):
372        """Testing BatchLauncher methods."""
373        # Create the TaskManager.
374        manager = TaskManager.from_string(self.MANAGER)
375        print("batch_adapter", manager.batch_adapter)
376        assert manager.batch_adapter is not None
377
378        def build_flow_with_name(name):
379            """Build a flow with workdir None and the given name."""
380            flow = Flow(workdir=None, manager=self.manager)
381            flow.set_name(name)
382
383            flow.register_task(self.fake_input)
384            work = Work()
385            work.register_scf_task(self.fake_input)
386            flow.register_work(work)
387
388            return flow
389
390        tmpdir = tempfile.mkdtemp()
391        batch = BatchLauncher(workdir=tmpdir, manager=manager)
392        str(batch)
393
394        flow0 = build_flow_with_name("flow0")
395        flow1 = build_flow_with_name("flow1")
396        flow2_same_name = build_flow_with_name("flow1")
397
398        batch.add_flow(flow0)
399
400        # Cannot add the same flow twice.
401        with self.assertRaises(batch.Error):
402            batch.add_flow(flow0)
403
404        batch.add_flow(flow1)
405
406        # Cannot add two flows with the same name.
407        with self.assertRaises(batch.Error):
408            batch.add_flow(flow2_same_name)
409
410        batch.submit(dry_run=True)
411
412        for i, flow in enumerate([flow0, flow1]):
413            assert flow.workdir == os.path.join(batch.workdir, "flow%d" % i)
414
415        batch.pickle_dump()
416        batch_from_pickle = BatchLauncher.pickle_load(batch.workdir)
417        assert all(f1 == f2 for f1, f2 in zip(batch.flows, batch_from_pickle.flows))
418